http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/brooklyn/entity/nosql/solr/SolrServerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/test/java/brooklyn/entity/nosql/solr/SolrServerIntegrationTest.java b/software/nosql/src/test/java/brooklyn/entity/nosql/solr/SolrServerIntegrationTest.java deleted file mode 100644 index 4c7e08c..0000000 --- a/software/nosql/src/test/java/brooklyn/entity/nosql/solr/SolrServerIntegrationTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.solr; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; - -import org.apache.solr.common.SolrDocument; -import org.testng.annotations.Test; - -import brooklyn.entity.basic.Entities; -import brooklyn.entity.proxying.EntitySpec; -import brooklyn.entity.trait.Startable; -import brooklyn.test.EntityTestUtils; -import brooklyn.util.collections.MutableMap; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; - -/** - * Solr integration tests. - * - * Test the operation of the {@link SolrServer} class. - */ -public class SolrServerIntegrationTest extends AbstractSolrServerTest { - - /** - * Test that a node starts and sets SERVICE_UP correctly. - */ - @Test(groups = "Integration") - public void canStartupAndShutdown() { - solr = app.createAndManageChild(EntitySpec.create(SolrServer.class)); - app.start(ImmutableList.of(testLocation)); - - EntityTestUtils.assertAttributeEqualsEventually(solr, Startable.SERVICE_UP, true); - Entities.dumpInfo(app); - - solr.stop(); - - EntityTestUtils.assertAttributeEqualsEventually(solr, Startable.SERVICE_UP, false); - } - - /** - * Test that a core can be created and used with SolrJ client. - */ - @Test(groups = "Integration") - public void testConnection() throws Exception { - solr = app.createAndManageChild(EntitySpec.create(SolrServer.class) - .configure(SolrServer.SOLR_CORE_CONFIG, ImmutableMap.of("example", "classpath://solr/example.tgz"))); - app.start(ImmutableList.of(testLocation)); - - EntityTestUtils.assertAttributeEqualsEventually(solr, Startable.SERVICE_UP, true); - - SolrJSupport client = new SolrJSupport(solr, "example"); - - Iterable<SolrDocument> results = client.getDocuments(); - assertTrue(Iterables.isEmpty(results)); - - client.addDocument(MutableMap.<String, Object>of("id", "1", "description", "first")); - client.addDocument(MutableMap.<String, Object>of("id", "2", "description", "second")); - client.addDocument(MutableMap.<String, Object>of("id", "3", "description", "third")); - client.commit(); - - results = client.getDocuments(); - assertEquals(Iterables.size(results), 3); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/brooklyn/entity/nosql/solr/SolrServerLiveTest.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/test/java/brooklyn/entity/nosql/solr/SolrServerLiveTest.java b/software/nosql/src/test/java/brooklyn/entity/nosql/solr/SolrServerLiveTest.java deleted file mode 100644 index 82fb107..0000000 --- a/software/nosql/src/test/java/brooklyn/entity/nosql/solr/SolrServerLiveTest.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.solr; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; - -import java.util.Map; - -import org.apache.solr.common.SolrDocument; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -import brooklyn.entity.proxying.EntitySpec; -import brooklyn.entity.trait.Startable; -import brooklyn.test.EntityTestUtils; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.text.Strings; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; - -/** - * Solr live tests. - * - * Test the operation of the {@link SolrServer} class using the jclouds {@code rackspace-cloudservers-uk} - * and {@code aws-ec2} providers, with different OS images. The tests use the {@link SolrJSupport} class - * to exercise the node, and will need to have {@code brooklyn.jclouds.provider.identity} and {@code .credential} - * set, usually in the {@code .brooklyn/brooklyn.properties} file. - */ -public class SolrServerLiveTest extends AbstractSolrServerTest { - - private static final Logger log = LoggerFactory.getLogger(SolrServerLiveTest.class); - - @DataProvider(name = "virtualMachineData") - public Object[][] provideVirtualMachineData() { - return new Object[][] { // ImageId, Provider, Region, Description (for logging) - new Object[] { "eu-west-1/ami-0307d674", "aws-ec2", "eu-west-1", "Ubuntu Server 14.04 LTS (HVM), SSD Volume Type" }, - new Object[] { "LON/f9b690bf-88eb-43c2-99cf-391f2558732e", "rackspace-cloudservers-uk", "", "Ubuntu 12.04 LTS (Precise Pangolin)" }, - new Object[] { "LON/a84b1592-6817-42da-a57c-3c13f3cfc1da", "rackspace-cloudservers-uk", "", "CentOS 6.5 (PVHVM)" }, - }; - } - - @Test(groups = "Live", dataProvider = "virtualMachineData") - protected void testOperatingSystemProvider(String imageId, String provider, String region, String description) throws Exception { - log.info("Testing Solr on {}{} using {} ({})", new Object[] { provider, Strings.isNonEmpty(region) ? ":" + region : "", description, imageId }); - - Map<String, String> properties = MutableMap.of("imageId", imageId); - testLocation = app.getManagementContext().getLocationRegistry() - .resolve(provider + (Strings.isNonEmpty(region) ? ":" + region : ""), properties); - solr = app.createAndManageChild(EntitySpec.create(SolrServer.class) - .configure(SolrServer.SOLR_CORE_CONFIG, ImmutableMap.of("example", "classpath://solr/example.tgz"))); - app.start(ImmutableList.of(testLocation)); - - EntityTestUtils.assertAttributeEqualsEventually(solr, Startable.SERVICE_UP, true); - - SolrJSupport client = new SolrJSupport(solr, "example"); - - Iterable<SolrDocument> results = client.getDocuments(); - assertTrue(Iterables.isEmpty(results)); - - client.addDocument(MutableMap.<String, Object>of("id", "1", "description", "first")); - client.addDocument(MutableMap.<String, Object>of("id", "2", "description", "second")); - client.addDocument(MutableMap.<String, Object>of("id", "3", "description", "third")); - client.commit(); - - results = client.getDocuments(); - assertEquals(Iterables.size(results), 3); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/AbstractCassandraNodeTest.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/AbstractCassandraNodeTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/AbstractCassandraNodeTest.java new file mode 100644 index 0000000..ab158bd --- /dev/null +++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/AbstractCassandraNodeTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.cassandra; + +import org.apache.brooklyn.entity.nosql.cassandra.CassandraNode; +import org.testng.annotations.BeforeMethod; + +import brooklyn.entity.BrooklynAppLiveTestSupport; +import brooklyn.location.Location; + +/** + * Cassandra test framework for integration and live tests. + */ +public class AbstractCassandraNodeTest extends BrooklynAppLiveTestSupport { + + protected Location testLocation; + protected CassandraNode cassandra; + + @BeforeMethod(alwaysRun = true) + @Override + public void setUp() throws Exception { + super.setUp(); + testLocation = app.newLocalhostProvisioningLocation(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/AstyanaxSupport.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/AstyanaxSupport.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/AstyanaxSupport.java new file mode 100644 index 0000000..b7587d7 --- /dev/null +++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/AstyanaxSupport.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.cassandra; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +import org.apache.brooklyn.entity.nosql.cassandra.CassandraNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; + +import brooklyn.entity.basic.Attributes; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.text.Identifiers; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.netflix.astyanax.AstyanaxContext; +import com.netflix.astyanax.Cluster; +import com.netflix.astyanax.Keyspace; +import com.netflix.astyanax.MutationBatch; +import com.netflix.astyanax.connectionpool.NodeDiscoveryType; +import com.netflix.astyanax.connectionpool.OperationResult; +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; +import com.netflix.astyanax.connectionpool.exceptions.SchemaDisagreementException; +import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl; +import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor; +import com.netflix.astyanax.impl.AstyanaxConfigurationImpl; +import com.netflix.astyanax.model.Column; +import com.netflix.astyanax.model.ColumnFamily; +import com.netflix.astyanax.model.ColumnList; +import com.netflix.astyanax.serializers.StringSerializer; +import com.netflix.astyanax.thrift.ThriftFamilyFactory; + +/** + * Cassandra testing using Astyanax API. + */ +public class AstyanaxSupport { + private static final Logger log = LoggerFactory.getLogger(AstyanaxSupport.class); + + public final String clusterName; + public final String hostname; + public final int thriftPort; + + public AstyanaxSupport(CassandraNode node) { + this(node.getClusterName(), node.getAttribute(Attributes.HOSTNAME), node.getThriftPort()); + } + + public AstyanaxSupport(String clusterName, String hostname, int thriftPort) { + this.clusterName = clusterName; + this.hostname = hostname; + this.thriftPort = thriftPort; + } + + public AstyanaxContext<Keyspace> newAstyanaxContextForKeyspace(String keyspace) { + AstyanaxContext<Keyspace> context = new AstyanaxContext.Builder() + .forCluster(clusterName) + .forKeyspace(keyspace) + .withAstyanaxConfiguration(new AstyanaxConfigurationImpl() + .setDiscoveryType(NodeDiscoveryType.NONE)) + .withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("BrooklynPool") + .setPort(thriftPort) + .setMaxConnsPerHost(1) + .setConnectTimeout(5000) // 10s + .setSeeds(String.format("%s:%d", hostname, thriftPort))) + .withConnectionPoolMonitor(new CountingConnectionPoolMonitor()) + .buildKeyspace(ThriftFamilyFactory.getInstance()); + + context.start(); + return context; + } + + public AstyanaxContext<Cluster> newAstyanaxContextForCluster() { + AstyanaxContext<Cluster> context = new AstyanaxContext.Builder() + .forCluster(clusterName) + .withAstyanaxConfiguration(new AstyanaxConfigurationImpl() + .setDiscoveryType(NodeDiscoveryType.NONE)) + .withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("BrooklynPool") + .setPort(thriftPort) + .setMaxConnsPerHost(1) + .setConnectTimeout(5000) // 10s + .setSeeds(String.format("%s:%d", hostname, thriftPort))) + .withConnectionPoolMonitor(new CountingConnectionPoolMonitor()) + .buildCluster(ThriftFamilyFactory.getInstance()); + + context.start(); + return context; + } + + public static class AstyanaxSample extends AstyanaxSupport { + + public static class Builder { + protected CassandraNode node; + protected String clusterName; + protected String hostname; + protected Integer thriftPort; + protected String columnFamilyName = Identifiers.makeRandomId(8); + + public Builder node(CassandraNode val) { + this.node = val; + clusterName = node.getClusterName(); + hostname = node.getAttribute(Attributes.HOSTNAME); + thriftPort = node.getThriftPort(); + return this; + } + public Builder host(String clusterName, String hostname, int thriftPort) { + this.clusterName = clusterName; + this.hostname = hostname; + this.thriftPort = thriftPort; + return this; + } + public Builder columnFamilyName(String val) { + this.columnFamilyName = val; + return this; + } + public AstyanaxSample build() { + return new AstyanaxSample(this); + } + } + + public static Builder builder() { + return new Builder(); + } + + public final String columnFamilyName; + public final ColumnFamily<String, String> sampleColumnFamily; + + public AstyanaxSample(CassandraNode node) { + this(builder().node(node)); + } + + public AstyanaxSample(String clusterName, String hostname, int thriftPort) { + this(builder().host(clusterName, hostname, thriftPort)); + } + + protected AstyanaxSample(Builder builder) { + super(builder.clusterName, builder.hostname, builder.thriftPort); + columnFamilyName = checkNotNull(builder.columnFamilyName, "columnFamilyName"); + sampleColumnFamily = new ColumnFamily<String, String>( + columnFamilyName, // Column Family Name + StringSerializer.get(), // Key Serializer + StringSerializer.get()); // Column Serializer + } + + /** + * Exercise the {@link CassandraNode} using the Astyanax API. + */ + public void astyanaxTest() throws Exception { + String keyspaceName = "BrooklynTests_"+Identifiers.makeRandomId(8); + writeData(keyspaceName); + readData(keyspaceName); + } + + /** + * Write to a {@link CassandraNode} using the Astyanax API. + * @throws ConnectionException + */ + public void writeData(String keyspaceName) throws ConnectionException { + // Create context + AstyanaxContext<Keyspace> context = newAstyanaxContextForKeyspace(keyspaceName); + try { + Keyspace keyspace = context.getEntity(); + try { + checkNull(keyspace.describeKeyspace().getColumnFamily(columnFamilyName), "key space for column family "+columnFamilyName); + } catch (Exception ek) { + // (Re) Create keyspace if needed (including if family name already existed, + // e.g. due to a timeout on previous attempt) + log.debug("repairing Cassandra error by re-creating keyspace "+keyspace+": "+ek); + try { + log.debug("dropping Cassandra keyspace "+keyspace); + keyspace.dropKeyspace(); + } catch (Exception e) { + /* Ignore */ + log.debug("Cassandra keyspace "+keyspace+" could not be dropped (probably did not exist): "+e); + } + try { + keyspace.createKeyspace(ImmutableMap.<String, Object>builder() + .put("strategy_options", ImmutableMap.<String, Object>of("replication_factor", "1")) + .put("strategy_class", "SimpleStrategy") + .build()); + } catch (SchemaDisagreementException e) { + // discussion (but not terribly helpful) at http://stackoverflow.com/questions/6770894/schemadisagreementexception + // let's just try again after a delay + // (seems to have no effect; trying to fix by starting first node before others) + log.warn("error creating Cassandra keyspace "+keyspace+" (retrying): "+e); + Time.sleep(Duration.FIVE_SECONDS); + keyspace.createKeyspace(ImmutableMap.<String, Object>builder() + .put("strategy_options", ImmutableMap.<String, Object>of("replication_factor", "1")) + .put("strategy_class", "SimpleStrategy") + .build()); + } + } + + assertNull(keyspace.describeKeyspace().getColumnFamily("Rabbits"), "key space for arbitrary column family Rabbits"); + assertNull(keyspace.describeKeyspace().getColumnFamily(columnFamilyName), "key space for column family "+columnFamilyName); + + // Create column family + keyspace.createColumnFamily(sampleColumnFamily, null); + + // Insert rows + MutationBatch m = keyspace.prepareMutationBatch(); + m.withRow(sampleColumnFamily, "one") + .putColumn("name", "Alice", null) + .putColumn("company", "Cloudsoft Corp", null); + m.withRow(sampleColumnFamily, "two") + .putColumn("name", "Bob", null) + .putColumn("company", "Cloudsoft Corp", null) + .putColumn("pet", "Cat", null); + + OperationResult<Void> insert = m.execute(); + assertEquals(insert.getHost().getHostName(), hostname); + assertTrue(insert.getLatency() > 0L); + } finally { + context.shutdown(); + } + } + + /** + * Read from a {@link CassandraNode} using the Astyanax API. + * @throws ConnectionException + */ + public void readData(String keyspaceName) throws ConnectionException { + // Create context + AstyanaxContext<Keyspace> context = newAstyanaxContextForKeyspace(keyspaceName); + try { + Keyspace keyspace = context.getEntity(); + + // Query data + OperationResult<ColumnList<String>> query = keyspace.prepareQuery(sampleColumnFamily) + .getKey("one") + .execute(); + assertEquals(query.getHost().getHostName(), hostname); + assertTrue(query.getLatency() > 0L); + + ColumnList<String> columns = query.getResult(); + assertEquals(columns.size(), 2); + + // Lookup columns in response by name + String name = columns.getColumnByName("name").getStringValue(); + assertEquals(name, "Alice"); + + // Iterate through the columns + for (Column<String> c : columns) { + assertTrue(ImmutableList.of("name", "company").contains(c.getName())); + } + } finally { + context.shutdown(); + } + } + + + /** + * Returns the keyspace name to which the data has been written. If it fails the first time, + * then will increment the keyspace name. This is because the failure could be a response timeout, + * where the keyspace really has been created so subsequent attempts with the same name will + * fail (because we assert that the keyspace did not exist). + */ + public String writeData(String keyspacePrefix, int numRetries) throws ConnectionException { + int retryCount = 0; + while (true) { + try { + String keyspaceName = keyspacePrefix + (retryCount > 0 ? "" : "_"+retryCount); + writeData(keyspaceName); + return keyspaceName; + } catch (Exception e) { + log.warn("Error writing data - attempt "+(retryCount+1)+" of "+(numRetries+1)+": "+e, e); + if (++retryCount > numRetries) + throw Exceptions.propagate(e); + } + } + } + + /** + * Repeatedly tries to read data from the given keyspace name. Asserts that the data is the + * same as would be written by calling {@code writeData(keyspaceName)}. + */ + public void readData(String keyspaceName, int numRetries) throws ConnectionException { + int retryCount = 0; + while (true) { + try { + readData(keyspaceName); + return; + } catch (Exception e) { + log.warn("Error reading data - attempt "+(retryCount+1)+" of "+(numRetries+1)+": "+e, e); + if (++retryCount > numRetries) + throw Exceptions.propagate(e); + } + } + } + + /** + * Like {@link Assert#assertNull(Object, String)}, except throws IllegalStateException instead + */ + private void checkNull(Object obj, String msg) { + if (obj != null) { + throw new IllegalStateException("Not null: "+msg+"; obj="+obj); + } + } + } + + public static void main(String[] args) throws Exception { + AstyanaxSample support = new AstyanaxSample("ignored", "ec2-79-125-32-2.eu-west-1.compute.amazonaws.com", 9160); + AstyanaxContext<Cluster> context = support.newAstyanaxContextForCluster(); + try { + System.out.println(context.getEntity().describeSchemaVersions()); + } finally { + context.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterIntegrationTest.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterIntegrationTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterIntegrationTest.java new file mode 100644 index 0000000..ddd6243 --- /dev/null +++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterIntegrationTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.cassandra; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import java.math.BigInteger; + +import org.apache.brooklyn.entity.nosql.cassandra.CassandraDatacenter; +import org.apache.brooklyn.entity.nosql.cassandra.CassandraNode; +import org.apache.brooklyn.entity.nosql.cassandra.TokenGenerators.PosNeg63TokenGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.BrooklynAppLiveTestSupport; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.entity.trait.Startable; +import brooklyn.location.Location; +import brooklyn.test.Asserts; +import brooklyn.test.EntityTestUtils; +import brooklyn.util.collections.MutableMap; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + +/** + * An integration test of the {@link CassandraDatacenter} entity. + * + * Tests that a one node cluster can be started on localhost and data can be written/read, using the Astyanax API. + * + * NOTE: If these tests fail with "Timeout waiting for SERVICE_UP" and "java.lang.IllegalStateException: Unable to contact any seeds!" + * or "java.lang.RuntimeException: Unable to gossip with any seeds" appears in the log, it may be that the broadcast_address + * (set to InetAddress.getLocalHost().getHostName()) is not resolving to the value specified in listen_address + * (InetAddress.getLocalHost().getHostAddress()). You can work round this issue by ensuring that you machine has only one + * address, e.g. by disabling wireless if you are also using a wired connection + */ +public class CassandraDatacenterIntegrationTest extends BrooklynAppLiveTestSupport { + + private static final Logger log = LoggerFactory.getLogger(CassandraDatacenterIntegrationTest.class); + + protected Location testLocation; + protected CassandraDatacenter cluster; + + @BeforeMethod(alwaysRun = true) + @Override + public void setUp() throws Exception { + CassandraNodeIntegrationTest.assertCassandraPortsAvailableEventually(); + super.setUp(); + testLocation = app.newLocalhostProvisioningLocation(); + } + + @AfterMethod(alwaysRun=true) + @Override + public void tearDown() throws Exception { + super.tearDown(); + CassandraNodeIntegrationTest.assertCassandraPortsAvailableEventually(); + } + + + @Test(groups = "Integration") + public void testStartAndShutdownClusterSizeOne() throws Exception { + EntitySpec<CassandraDatacenter> spec = EntitySpec.create(CassandraDatacenter.class) + .configure("initialSize", 1) + .configure("tokenShift", 42); + runStartAndShutdownClusterSizeOne(spec, true); + } + + /** + * Cassandra v2 needs Java >= 1.7. If you have java 6 as the defult locally, then you can use + * something like {@code .configure("shell.env", MutableMap.of("JAVA_HOME", "/Library/Java/JavaVirtualMachines/jdk1.7.0_51.jdk/Contents/Home"))} + */ + @Test(groups = "Integration") + public void testStartAndShutdownClusterSizeOneCassandraVersion2() throws Exception { + String version = "2.0.9"; + + EntitySpec<CassandraDatacenter> spec = EntitySpec.create(CassandraDatacenter.class) + .configure(CassandraNode.SUGGESTED_VERSION, version) + .configure("initialSize", 1); + runStartAndShutdownClusterSizeOne(spec, false); + } + + /** + * Test that a single node cluster starts up and allows access via the Astyanax API. + * Only one node because Cassandra can only run one node per VM! + */ + protected void runStartAndShutdownClusterSizeOne(EntitySpec<CassandraDatacenter> datacenterSpec, final boolean assertToken) throws Exception { + cluster = app.createAndManageChild(datacenterSpec); + assertEquals(cluster.getCurrentSize().intValue(), 0); + + app.start(ImmutableList.of(testLocation)); + Entities.dumpInfo(app); + + final CassandraNode node = (CassandraNode) Iterables.get(cluster.getMembers(), 0); + String nodeAddr = checkNotNull(node.getAttribute(CassandraNode.HOSTNAME), "hostname") + ":" + checkNotNull(node.getAttribute(CassandraNode.THRIFT_PORT), "thriftPort"); + + EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.GROUP_SIZE, 1); + EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.CASSANDRA_CLUSTER_NODES, ImmutableList.of(nodeAddr)); + + EntityTestUtils.assertAttributeEqualsEventually(node, Startable.SERVICE_UP, true); + if (assertToken) { + PosNeg63TokenGenerator tg = new PosNeg63TokenGenerator(); + tg.growingCluster(1); + EntityTestUtils.assertAttributeEqualsEventually(node, CassandraNode.TOKEN, tg.newToken().add(BigInteger.valueOf(42))); + } + + // may take some time to be consistent (with new thrift_latency checks on the node, + // contactability should not be an issue, but consistency still might be) + Asserts.succeedsEventually(MutableMap.of("timeout", 120*1000), new Runnable() { + public void run() { + boolean open = CassandraDatacenterLiveTest.isSocketOpen(node); + Boolean consistant = open ? CassandraDatacenterLiveTest.areVersionsConsistent(node) : null; + Integer numPeers = node.getAttribute(CassandraNode.PEERS); + Integer liveNodeCount = node.getAttribute(CassandraNode.LIVE_NODE_COUNT); + String msg = "consistency: " + + (!open ? "unreachable" : consistant==null ? "error" : consistant)+"; " + + "peer group sizes: "+numPeers + "; live node count: " + liveNodeCount; + assertTrue(open, msg); + assertEquals(consistant, Boolean.TRUE, msg); + if (assertToken) { + assertEquals(numPeers, (Integer)1, msg); + } else { + assertTrue(numPeers != null && numPeers >= 1, msg); + } + assertEquals(liveNodeCount, (Integer)1, msg); + }}); + + CassandraDatacenterLiveTest.checkConnectionRepeatedly(2, 5, ImmutableList.of(node)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterLiveTest.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterLiveTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterLiveTest.java new file mode 100644 index 0000000..d29bc1a --- /dev/null +++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterLiveTest.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.cassandra; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +import java.math.BigInteger; +import java.net.Socket; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.brooklyn.entity.nosql.cassandra.CassandraDatacenter; +import org.apache.brooklyn.entity.nosql.cassandra.CassandraNode; +import org.apache.brooklyn.entity.nosql.cassandra.AstyanaxSupport.AstyanaxSample; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.BrooklynAppLiveTestSupport; +import brooklyn.entity.Entity; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.entity.trait.Startable; +import brooklyn.location.Location; +import brooklyn.test.Asserts; +import brooklyn.test.EntityTestUtils; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.text.Identifiers; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.netflix.astyanax.AstyanaxContext; +import com.netflix.astyanax.Cluster; +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; + +/** + * A live test of the {@link CassandraDatacenter} entity. + * + * Tests that a two node cluster can be started on Amazon EC2 and data written on one {@link CassandraNode} + * can be read from another, using the Astyanax API. + */ +public class CassandraDatacenterLiveTest extends BrooklynAppLiveTestSupport { + + private static final Logger log = LoggerFactory.getLogger(CassandraDatacenterLiveTest.class); + + private String provider = + "aws-ec2:eu-west-1"; +// "rackspace-cloudservers-uk"; +// "named:hpcloud-compute-at"; +// "localhost"; +// "jcloudsByon:(provider=\"aws-ec2\",region=\"us-east-1\",user=\"aled\",hosts=\"i-6f374743,i-35324219,i-1135453d\")"; + + protected Location testLocation; + protected CassandraDatacenter cluster; + + @BeforeMethod(alwaysRun = true) + @Override + public void setUp() throws Exception { + super.setUp(); + testLocation = mgmt.getLocationRegistry().resolve(provider); + } + + @AfterMethod(alwaysRun=true) + @Override + public void tearDown() throws Exception { + super.tearDown(); + } + + @Test(groups = "Live") + public void testDatacenter() throws Exception { + EntitySpec<CassandraDatacenter> spec = EntitySpec.create(CassandraDatacenter.class) + .configure("initialSize", 2) + .configure("clusterName", "CassandraClusterLiveTest"); + runCluster(spec, false); + } + + @Test(groups = "Live") + public void testDatacenterWithVnodes() throws Exception { + EntitySpec<CassandraDatacenter> spec = EntitySpec.create(CassandraDatacenter.class) + .configure("initialSize", 2) + .configure(CassandraDatacenter.USE_VNODES, true) + .configure("clusterName", "CassandraClusterLiveTest"); + runCluster(spec, true); + } + + /* + * TODO on some distros (e.g. CentOS?), it comes pre-installed with java 6. Installing java 7 + * didn't seem to be enough. I also had to set JAVA_HOME: + * .configure("shell.env", MutableMap.of("JAVA_HOME", "/etc/alternatives/java_sdk_1.7.0")) + * However, that would break other deployments such as on Ubuntu where JAVA_HOME would be different. + */ + @Test(groups = "Live") + public void testDatacenterWithVnodesVersion2() throws Exception { + EntitySpec<CassandraDatacenter> spec = EntitySpec.create(CassandraDatacenter.class) + .configure("initialSize", 2) + .configure(CassandraNode.SUGGESTED_VERSION, "2.0.9") + .configure(CassandraDatacenter.USE_VNODES, true) + .configure("clusterName", "CassandraClusterLiveTest"); + runCluster(spec, true); + } + + @Test(groups = {"Live", "Acceptance"}, invocationCount=10) + public void testManyTimes() throws Exception { + testDatacenter(); + } + + /** + * Test a Cassandra Datacenter: + * <ol> + * <li>Create two node datacenter + * <li>Confirm allows access via the Astyanax API through both nodes. + * <li>Confirm can size + * </ol> + */ + protected void runCluster(EntitySpec<CassandraDatacenter> datacenterSpec, boolean usesVnodes) throws Exception { + cluster = app.createAndManageChild(datacenterSpec); + assertEquals(cluster.getCurrentSize().intValue(), 0); + + app.start(ImmutableList.of(testLocation)); + + // Check cluster is up and healthy + EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.GROUP_SIZE, 2); + Entities.dumpInfo(app); + List<CassandraNode> members = castToCassandraNodes(cluster.getMembers()); + assertNodesConsistent(members); + + if (usesVnodes) { + assertVnodeTokensConsistent(members); + } else { + assertSingleTokenConsistent(members); + } + + // Can connect via Astyanax + checkConnectionRepeatedly(2, 5, members); + + // Resize + cluster.resize(3); + assertEquals(cluster.getMembers().size(), 3, "members="+cluster.getMembers()); + if (usesVnodes) { + assertVnodeTokensConsistent(castToCassandraNodes(cluster.getMembers())); + } else { + assertSingleTokenConsistent(castToCassandraNodes(cluster.getMembers())); + } + checkConnectionRepeatedly(2, 5, cluster.getMembers()); + } + + protected static List<CassandraNode> castToCassandraNodes(Collection<? extends Entity> rawnodes) { + final List<CassandraNode> nodes = Lists.newArrayList(); + for (Entity node : rawnodes) { + nodes.add((CassandraNode) node); + } + return nodes; + } + + protected static void assertNodesConsistent(final List<CassandraNode> nodes) { + final Integer expectedLiveNodeCount = nodes.size(); + // may take some time to be consistent (with new thrift_latency checks on the node, + // contactability should not be an issue, but consistency still might be) + Asserts.succeedsEventually(MutableMap.of("timeout", Duration.TWO_MINUTES), new Runnable() { + public void run() { + for (Entity n : nodes) { + CassandraNode node = (CassandraNode) n; + EntityTestUtils.assertAttributeEquals(node, Startable.SERVICE_UP, true); + String errmsg = "node="+node+"; hostname="+node.getAttribute(Attributes.HOSTNAME)+"; port="+node.getThriftPort(); + assertTrue(isSocketOpen(node), errmsg); + assertTrue(areVersionsConsistent(node), errmsg); + EntityTestUtils.assertAttributeEquals(node, CassandraNode.LIVE_NODE_COUNT, expectedLiveNodeCount); + } + }}); + } + + protected static void assertSingleTokenConsistent(final List<CassandraNode> nodes) { + final int numNodes = nodes.size(); + Asserts.succeedsEventually(MutableMap.of("timeout", Duration.TWO_MINUTES), new Runnable() { + public void run() { + Set<BigInteger> alltokens = Sets.newLinkedHashSet(); + for (Entity node : nodes) { + EntityTestUtils.assertAttributeEquals(node, Startable.SERVICE_UP, true); + EntityTestUtils.assertConfigEquals(node, CassandraNode.NUM_TOKENS_PER_NODE, 1); + EntityTestUtils.assertAttributeEquals(node, CassandraNode.PEERS, numNodes); + BigInteger token = node.getAttribute(CassandraNode.TOKEN); + Set<BigInteger> tokens = node.getAttribute(CassandraNode.TOKENS); + assertNotNull(token); + assertEquals(tokens, ImmutableSet.of(token)); + alltokens.addAll(tokens); + } + assertEquals(alltokens.size(), numNodes); + }}); + } + + protected static void assertVnodeTokensConsistent(final List<CassandraNode> nodes) { + final int numNodes = nodes.size(); + final int tokensPerNode = Iterables.get(nodes, 0).getNumTokensPerNode(); + + Asserts.succeedsEventually(MutableMap.of("timeout", Duration.TWO_MINUTES), new Runnable() { + public void run() { + Set<BigInteger> alltokens = Sets.newLinkedHashSet(); + for (Entity node : nodes) { + EntityTestUtils.assertAttributeEquals(node, Startable.SERVICE_UP, true); + EntityTestUtils.assertAttributeEquals(node, CassandraNode.PEERS, tokensPerNode*numNodes); + EntityTestUtils.assertConfigEquals(node, CassandraNode.NUM_TOKENS_PER_NODE, 256); + BigInteger token = node.getAttribute(CassandraNode.TOKEN); + Set<BigInteger> tokens = node.getAttribute(CassandraNode.TOKENS); + assertNotNull(token); + assertEquals(tokens.size(), tokensPerNode, "tokens="+tokens); + alltokens.addAll(tokens); + } + assertEquals(alltokens.size(), tokensPerNode*numNodes); + }}); + } + + protected static void checkConnectionRepeatedly(int totalAttemptsAllowed, int numRetriesPerAttempt, Iterable<? extends Entity> nodes) throws Exception { + int attemptNum = 0; + while (true) { + try { + checkConnection(numRetriesPerAttempt, nodes); + return; + } catch (Exception e) { + attemptNum++; + if (attemptNum >= totalAttemptsAllowed) { + log.warn("Cassandra not usable, "+attemptNum+" attempts; failing: "+e, e); + throw e; + } + log.warn("Cassandra not usable (attempt "+attemptNum+" of "+totalAttemptsAllowed+"), trying again after delay: "+e, e); + Time.sleep(Duration.TEN_SECONDS); + } + } + } + + protected static void checkConnection(int numRetries, Iterable<? extends Entity> nodes) throws ConnectionException { + CassandraNode first = (CassandraNode) Iterables.get(nodes, 0); + + // have been seeing intermittent SchemaDisagreementException errors on AWS, probably due to Astyanax / how we are using it + // (confirmed that clocks are in sync) + String uniqueName = Identifiers.makeRandomId(8); + AstyanaxSample astyanaxFirst = AstyanaxSample.builder().node(first).columnFamilyName(uniqueName).build(); + Map<String, List<String>> versions; + AstyanaxContext<Cluster> context = astyanaxFirst.newAstyanaxContextForCluster(); + try { + versions = context.getEntity().describeSchemaVersions(); + } finally { + context.shutdown(); + } + + log.info("Cassandra schema versions are: "+versions); + if (versions.size() > 1) { + Assert.fail("Inconsistent versions on Cassandra start: "+versions); + } + String keyspacePrefix = "BrooklynTests_"+Identifiers.makeRandomId(8); + + String keyspaceName = astyanaxFirst.writeData(keyspacePrefix, numRetries); + + for (Entity node : nodes) { + AstyanaxSample astyanaxSecond = AstyanaxSample.builder().node((CassandraNode)node).columnFamilyName(uniqueName).build(); + astyanaxSecond.readData(keyspaceName, numRetries); + } + } + + protected static Boolean areVersionsConsistent(CassandraNode node) { + AstyanaxContext<Cluster> context = null; + try { + context = new AstyanaxSample(node).newAstyanaxContextForCluster(); + Map<String, List<String>> v = context.getEntity().describeSchemaVersions(); + return v.size() == 1; + } catch (Exception e) { + return null; + } finally { + if (context != null) context.shutdown(); + } + } + + protected static boolean isSocketOpen(CassandraNode node) { + try { + Socket s = new Socket(node.getAttribute(Attributes.HOSTNAME), node.getThriftPort()); + s.close(); + return true; + } catch (Exception e) { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterRebindIntegrationTest.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterRebindIntegrationTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterRebindIntegrationTest.java new file mode 100644 index 0000000..4c2a248 --- /dev/null +++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterRebindIntegrationTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.cassandra; + +import static org.testng.Assert.assertNotNull; + +import java.math.BigInteger; +import java.util.Set; + +import org.apache.brooklyn.entity.nosql.cassandra.CassandraDatacenter; +import org.apache.brooklyn.entity.nosql.cassandra.CassandraNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.proxy.nginx.NginxController; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.entity.rebind.RebindOptions; +import brooklyn.entity.rebind.RebindTestFixtureWithApp; +import brooklyn.entity.trait.Startable; +import brooklyn.location.basic.LocalhostMachineProvisioningLocation; +import brooklyn.test.EntityTestUtils; + +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + +/** + * Test the operation of the {@link NginxController} class. + */ +public class CassandraDatacenterRebindIntegrationTest extends RebindTestFixtureWithApp { + private static final Logger LOG = LoggerFactory.getLogger(CassandraDatacenterRebindIntegrationTest.class); + + private LocalhostMachineProvisioningLocation localhostProvisioningLocation; + + @BeforeMethod(alwaysRun=true) + public void setUp() throws Exception { + CassandraNodeIntegrationTest.assertCassandraPortsAvailableEventually(); + super.setUp(); + localhostProvisioningLocation = origApp.newLocalhostProvisioningLocation(); + } + + @AfterMethod(alwaysRun=true) + @Override + public void tearDown() throws Exception { + super.tearDown(); + CassandraNodeIntegrationTest.assertCassandraPortsAvailableEventually(); + } + + /** + * Test that Brooklyn can rebind to a single node datacenter. + */ + @Test(groups = "Integration") + public void testRebindDatacenterOfSizeOne() throws Exception { + CassandraDatacenter origDatacenter = origApp.createAndManageChild(EntitySpec.create(CassandraDatacenter.class) + .configure("initialSize", 1)); + + origApp.start(ImmutableList.of(localhostProvisioningLocation)); + CassandraNode origNode = (CassandraNode) Iterables.get(origDatacenter.getMembers(), 0); + + EntityTestUtils.assertAttributeEqualsEventually(origDatacenter, CassandraDatacenter.GROUP_SIZE, 1); + CassandraDatacenterLiveTest.assertNodesConsistent(ImmutableList.of(origNode)); + CassandraDatacenterLiveTest.assertSingleTokenConsistent(ImmutableList.of(origNode)); + CassandraDatacenterLiveTest.checkConnectionRepeatedly(2, 5, ImmutableList.of(origNode)); + BigInteger origToken = origNode.getAttribute(CassandraNode.TOKEN); + Set<BigInteger> origTokens = origNode.getAttribute(CassandraNode.TOKENS); + assertNotNull(origToken); + + newApp = rebind(RebindOptions.create().terminateOrigManagementContext(true)); + final CassandraDatacenter newDatacenter = (CassandraDatacenter) Iterables.find(newApp.getChildren(), Predicates.instanceOf(CassandraDatacenter.class)); + final CassandraNode newNode = (CassandraNode) Iterables.find(newDatacenter.getMembers(), Predicates.instanceOf(CassandraNode.class)); + + EntityTestUtils.assertAttributeEqualsEventually(newDatacenter, CassandraDatacenter.GROUP_SIZE, 1); + EntityTestUtils.assertAttributeEqualsEventually(newNode, Startable.SERVICE_UP, true); + EntityTestUtils.assertAttributeEqualsEventually(newNode, CassandraNode.TOKEN, origToken); + EntityTestUtils.assertAttributeEqualsEventually(newNode, CassandraNode.TOKENS, origTokens); + CassandraDatacenterLiveTest.assertNodesConsistent(ImmutableList.of(newNode)); + CassandraDatacenterLiveTest.assertSingleTokenConsistent(ImmutableList.of(newNode)); + CassandraDatacenterLiveTest.checkConnectionRepeatedly(2, 5, ImmutableList.of(newNode)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterTest.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterTest.java new file mode 100644 index 0000000..3a1d202 --- /dev/null +++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterTest.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.cassandra; + +import static org.testng.Assert.assertEquals; + +import java.math.BigInteger; +import java.util.Map; +import java.util.Set; + +import org.apache.brooklyn.entity.nosql.cassandra.CassandraDatacenter; +import org.apache.brooklyn.entity.nosql.cassandra.CassandraNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.BrooklynAppUnitTestSupport; +import brooklyn.entity.Entity; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.EmptySoftwareProcess; +import brooklyn.entity.basic.EmptySoftwareProcessSshDriver; +import brooklyn.entity.basic.EntityInternal; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.location.LocationSpec; +import brooklyn.location.basic.LocalhostMachineProvisioningLocation; +import brooklyn.test.EntityTestUtils; +import brooklyn.util.ResourceUtils; +import brooklyn.util.javalang.JavaClassNames; +import brooklyn.util.text.TemplateProcessor; +import brooklyn.util.time.Duration; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + +public class CassandraDatacenterTest extends BrooklynAppUnitTestSupport { + + private static final Logger log = LoggerFactory.getLogger(CassandraDatacenterTest.class); + + private LocalhostMachineProvisioningLocation loc; + private CassandraDatacenter cluster; + + @BeforeMethod(alwaysRun=true) + @Override + public void setUp() throws Exception { + super.setUp(); + loc = mgmt.getLocationManager().createLocation(LocationSpec.create(LocalhostMachineProvisioningLocation.class)); + } + + @Test + public void testPopulatesInitialSeeds() throws Exception { + cluster = app.createAndManageChild(EntitySpec.create(CassandraDatacenter.class) + .configure(CassandraDatacenter.INITIAL_SIZE, 2) + .configure(CassandraDatacenter.TOKEN_SHIFT, BigInteger.ZERO) + .configure(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER, Duration.ZERO) + .configure(CassandraDatacenter.MEMBER_SPEC, EntitySpec.create(EmptySoftwareProcess.class))); + + app.start(ImmutableList.of(loc)); + EmptySoftwareProcess e1 = (EmptySoftwareProcess) Iterables.get(cluster.getMembers(), 0); + EmptySoftwareProcess e2 = (EmptySoftwareProcess) Iterables.get(cluster.getMembers(), 1); + + EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.CURRENT_SEEDS, ImmutableSet.<Entity>of(e1, e2)); + } + + @Test(groups="Integration") // because takes approx 2 seconds + public void testUpdatesSeedsOnFailuresAndAdditions() throws Exception { + doTestUpdatesSeedsOnFailuresAndAdditions(true, false); + } + + protected void doTestUpdatesSeedsOnFailuresAndAdditions(boolean fast, boolean checkSeedsConstantOnRejoining) throws Exception { + cluster = app.createAndManageChild(EntitySpec.create(CassandraDatacenter.class) + .configure(CassandraDatacenter.INITIAL_SIZE, 2) + .configure(CassandraDatacenter.TOKEN_SHIFT, BigInteger.ZERO) + .configure(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER, Duration.ZERO) + .configure(CassandraDatacenter.MEMBER_SPEC, EntitySpec.create(EmptySoftwareProcess.class))); + + app.start(ImmutableList.of(loc)); + EmptySoftwareProcess e1 = (EmptySoftwareProcess) Iterables.get(cluster.getMembers(), 0); + EmptySoftwareProcess e2 = (EmptySoftwareProcess) Iterables.get(cluster.getMembers(), 1); + EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.CURRENT_SEEDS, ImmutableSet.<Entity>of(e1, e2)); + log.debug("Test "+JavaClassNames.niceClassAndMethod()+", cluster "+cluster+" has "+cluster.getMembers()+"; e1="+e1+" e2="+e2); + + // calling the driver stop for this entity will cause SERVICE_UP to become false, and stay false + // (and that's all it does, incidentally); if we just set the attribute it will become true on serviceUp sensor feed + ((EmptySoftwareProcess)e1).getDriver().stop(); + // not necessary, but speeds things up: + if (fast) + ((EntityInternal)e1).setAttribute(Attributes.SERVICE_UP, false); + + EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.CURRENT_SEEDS, ImmutableSet.<Entity>of(e2)); + + cluster.resize(3); + EmptySoftwareProcess e3 = (EmptySoftwareProcess) Iterables.getOnlyElement(Sets.difference(ImmutableSet.copyOf(cluster.getMembers()), ImmutableSet.of(e1,e2))); + log.debug("Test "+JavaClassNames.niceClassAndMethod()+", cluster "+cluster+" has "+cluster.getMembers()+"; e3="+e3); + try { + EntityTestUtils.assertAttributeEqualsEventually(cluster, CassandraDatacenter.CURRENT_SEEDS, ImmutableSet.<Entity>of(e2, e3)); + } finally { + log.debug("Test "+JavaClassNames.niceClassAndMethod()+", cluster "+cluster+" has "+cluster.getMembers()+"; seeds "+cluster.getAttribute(CassandraDatacenter.CURRENT_SEEDS)); + } + + if (!checkSeedsConstantOnRejoining) { + // cluster should not revert to e1+e2, simply because e1 has come back; but e1 should rejoin the group + // (not that important, and waits for 1s, so only done as part of integration) + ((EmptySoftwareProcessSshDriver)(((EmptySoftwareProcess)e1).getDriver())).launch(); + if (fast) + ((EntityInternal)e1).setAttribute(Attributes.SERVICE_UP, true); + EntityTestUtils.assertAttributeEqualsEventually(e1, CassandraNode.SERVICE_UP, true); + EntityTestUtils.assertAttributeEqualsContinually(cluster, CassandraDatacenter.CURRENT_SEEDS, ImmutableSet.<Entity>of(e2, e3)); + } + } + + @Test + public void testPopulatesInitialTokens() throws Exception { + cluster = app.createAndManageChild(EntitySpec.create(CassandraDatacenter.class) + .configure(CassandraDatacenter.INITIAL_SIZE, 2) + .configure(CassandraDatacenter.TOKEN_SHIFT, BigInteger.ZERO) + .configure(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER, Duration.ZERO) + .configure(CassandraDatacenter.MEMBER_SPEC, EntitySpec.create(EmptySoftwareProcess.class))); + + app.start(ImmutableList.of(loc)); + + Set<BigInteger> tokens = Sets.newLinkedHashSet(); + Set<BigInteger> tokens2 = Sets.newLinkedHashSet(); + for (Entity member : cluster.getMembers()) { + BigInteger memberToken = member.getConfig(CassandraNode.TOKEN); + Set<BigInteger > memberTokens = member.getConfig(CassandraNode.TOKENS); + if (memberToken != null) tokens.add(memberToken); + if (memberTokens != null) tokens2.addAll(memberTokens); + } + assertEquals(tokens, ImmutableSet.of(new BigInteger("-9223372036854775808"), BigInteger.ZERO)); + assertEquals(tokens2, ImmutableSet.of()); + } + + @Test + public void testDoesNotPopulateInitialTokens() throws Exception { + cluster = app.createAndManageChild(EntitySpec.create(CassandraDatacenter.class) + .configure(CassandraDatacenter.INITIAL_SIZE, 2) + .configure(CassandraDatacenter.USE_VNODES, true) + .configure(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER, Duration.ZERO) + .configure(CassandraDatacenter.MEMBER_SPEC, EntitySpec.create(EmptySoftwareProcess.class))); + + app.start(ImmutableList.of(loc)); + + Set<BigInteger> tokens = Sets.newLinkedHashSet(); + Set<BigInteger> tokens2 = Sets.newLinkedHashSet(); + for (Entity member : cluster.getMembers()) { + BigInteger memberToken = member.getConfig(CassandraNode.TOKEN); + Set<BigInteger > memberTokens = member.getConfig(CassandraNode.TOKENS); + if (memberToken != null) tokens.add(memberToken); + if (memberTokens != null) tokens2.addAll(memberTokens); + } + assertEquals(tokens, ImmutableSet.of()); + assertEquals(tokens2, ImmutableSet.of()); + } + + public static class MockInputForTemplate { + public BigInteger getToken() { return new BigInteger("-9223372036854775808"); } + public String getTokensAsString() { return "" + getToken(); } + public int getNumTokensPerNode() { return 1; } + public String getSeeds() { return ""; } + public int getGossipPort() { return 1234; } + public int getSslGossipPort() { return 1234; } + public int getThriftPort() { return 1234; } + public int getNativeTransportPort() { return 1234; } + public String getClusterName() { return "Mock"; } + public String getEndpointSnitchName() { return ""; } + public String getListenAddress() { return "0"; } + public String getBroadcastAddress() { return "0"; } + public String getRpcAddress() { return "0"; } + public String getRunDir() { return "/tmp/mock"; } + } + + @Test + public void testBigIntegerFormattedCorrectly() { + Map<String, Object> substitutions = ImmutableMap.<String, Object>builder() + .put("entity", new MockInputForTemplate()) + .put("driver", new MockInputForTemplate()) + .build(); + + String templatedUrl = CassandraNode.CASSANDRA_CONFIG_TEMPLATE_URL.getDefaultValue(); + String url = TemplateProcessor.processTemplateContents(templatedUrl, ImmutableMap.of("entity", ImmutableMap.of("majorMinorVersion", "1.2"))); + String templateContents = new ResourceUtils(this).getResourceAsString(url); + String processedTemplate = TemplateProcessor.processTemplateContents(templateContents, substitutions); + Assert.assertEquals(processedTemplate.indexOf("775,808"), -1); + Assert.assertTrue(processedTemplate.indexOf("-9223372036854775808") > 0); + } + + @Test(groups="Integration") // because takes approx 30 seconds + public void testUpdatesSeedsFastishManyTimes() throws Exception { + final int COUNT = 20; + for (int i=0; i<COUNT; i++) { + log.info("Test "+JavaClassNames.niceClassAndMethod()+", iteration "+(i+1)+" of "+COUNT); + try { + doTestUpdatesSeedsOnFailuresAndAdditions(true, true); + tearDown(); + setUp(); + } catch (Exception e) { + log.warn("Error in "+JavaClassNames.niceClassAndMethod()+", iteration "+(i+1)+" of "+COUNT, e); + throw e; + } + } + } + + @Test(groups="Integration") // because takes approx 5 seconds + public void testUpdateSeedsSlowAndRejoining() throws Exception { + final int COUNT = 1; + for (int i=0; i<COUNT; i++) { + log.info("Test "+JavaClassNames.niceClassAndMethod()+", iteration "+(i+1)+" of "+COUNT); + doTestUpdatesSeedsOnFailuresAndAdditions(false, true); + tearDown(); + setUp(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricTest.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricTest.java new file mode 100644 index 0000000..cbf55ed --- /dev/null +++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricTest.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.cassandra; + +import static org.testng.Assert.assertEquals; + +import java.util.Collection; +import java.util.Set; + +import org.apache.brooklyn.entity.nosql.cassandra.CassandraDatacenter; +import org.apache.brooklyn.entity.nosql.cassandra.CassandraFabric; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.BrooklynAppUnitTestSupport; +import brooklyn.entity.Entity; +import brooklyn.entity.basic.AbstractEntity; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.EmptySoftwareProcess; +import brooklyn.entity.basic.EntityInternal; +import brooklyn.entity.basic.EntityLocal; +import brooklyn.entity.basic.Lifecycle; +import brooklyn.entity.basic.ServiceStateLogic; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.entity.trait.Startable; +import brooklyn.location.Location; +import brooklyn.location.LocationSpec; +import brooklyn.location.basic.LocalhostMachineProvisioningLocation; +import brooklyn.test.EntityTestUtils; +import brooklyn.util.time.Duration; + +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + +public class CassandraFabricTest extends BrooklynAppUnitTestSupport { + + private static final Logger log = LoggerFactory.getLogger(CassandraFabricTest.class); + + private LocalhostMachineProvisioningLocation loc1; + private LocalhostMachineProvisioningLocation loc2; + private CassandraFabric fabric; + + @BeforeMethod(alwaysRun=true) + @Override + public void setUp() throws Exception { + super.setUp(); + loc1 = mgmt.getLocationManager().createLocation(LocationSpec.create(LocalhostMachineProvisioningLocation.class)); + loc2 = mgmt.getLocationManager().createLocation(LocationSpec.create(LocalhostMachineProvisioningLocation.class)); + } + + @Test + public void testPopulatesInitialSeeds() throws Exception { + fabric = app.createAndManageChild(EntitySpec.create(CassandraFabric.class) + .configure(CassandraFabric.INITIAL_QUORUM_SIZE, 2) + .configure(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER, Duration.ZERO) + .configure(CassandraFabric.MEMBER_SPEC, EntitySpec.create(CassandraDatacenter.class) + .configure(CassandraDatacenter.INITIAL_SIZE, 2) + .configure(CassandraDatacenter.MEMBER_SPEC, EntitySpec.create(EmptySoftwareProcess.class)))); + + app.start(ImmutableList.of(loc1, loc2)); + CassandraDatacenter d1 = (CassandraDatacenter) Iterables.get(fabric.getMembers(), 0); + CassandraDatacenter d2 = (CassandraDatacenter) Iterables.get(fabric.getMembers(), 1); + + final EmptySoftwareProcess d1a = (EmptySoftwareProcess) Iterables.get(d1.getMembers(), 0); + final EmptySoftwareProcess d1b = (EmptySoftwareProcess) Iterables.get(d1.getMembers(), 1); + + final EmptySoftwareProcess d2a = (EmptySoftwareProcess) Iterables.get(d2.getMembers(), 0); + final EmptySoftwareProcess d2b = (EmptySoftwareProcess) Iterables.get(d2.getMembers(), 1); + + Predicate<Set<Entity>> predicate = new Predicate<Set<Entity>>() { + @Override public boolean apply(Set<Entity> input) { + return input != null && input.size() >= 2 && + Sets.intersection(input, ImmutableSet.of(d1a, d1b)).size() == 1 && + Sets.intersection(input, ImmutableSet.of(d2a, d2b)).size() == 1; + } + }; + EntityTestUtils.assertAttributeEventually(fabric, CassandraFabric.CURRENT_SEEDS, predicate); + EntityTestUtils.assertAttributeEventually(d1, CassandraDatacenter.CURRENT_SEEDS, predicate); + EntityTestUtils.assertAttributeEventually(d2, CassandraDatacenter.CURRENT_SEEDS, predicate); + + Set<Entity> seeds = fabric.getAttribute(CassandraFabric.CURRENT_SEEDS); + assertEquals(d1.getAttribute(CassandraDatacenter.CURRENT_SEEDS), seeds); + assertEquals(d2.getAttribute(CassandraDatacenter.CURRENT_SEEDS), seeds); + log.info("Seeds="+seeds); + } + + @Test + public void testPopulatesInitialSeedsWhenNodesOfOneClusterComeUpBeforeTheOtherCluster() throws Exception { + fabric = app.createAndManageChild(EntitySpec.create(CassandraFabric.class) + .configure(CassandraFabric.INITIAL_QUORUM_SIZE, 2) + .configure(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER, Duration.ZERO) + .configure(CassandraFabric.MEMBER_SPEC, EntitySpec.create(CassandraDatacenter.class) + .configure(CassandraDatacenter.INITIAL_SIZE, 2) + .configure(CassandraDatacenter.MEMBER_SPEC, EntitySpec.create(DummyCassandraNode.class)))); + + Thread t = new Thread() { + public void run() { + app.start(ImmutableList.of(loc1, loc2)); + } + }; + t.start(); + try { + EntityTestUtils.assertGroupSizeEqualsEventually(fabric, 2); + CassandraDatacenter d1 = (CassandraDatacenter) Iterables.get(fabric.getMembers(), 0); + CassandraDatacenter d2 = (CassandraDatacenter) Iterables.get(fabric.getMembers(), 1); + + EntityTestUtils.assertGroupSizeEqualsEventually(d1, 2); + final DummyCassandraNode d1a = (DummyCassandraNode) Iterables.get(d1.getMembers(), 0); + final DummyCassandraNode d1b = (DummyCassandraNode) Iterables.get(d1.getMembers(), 1); + + EntityTestUtils.assertGroupSizeEqualsEventually(d2, 2); + final DummyCassandraNode d2a = (DummyCassandraNode) Iterables.get(d2.getMembers(), 0); + final DummyCassandraNode d2b = (DummyCassandraNode) Iterables.get(d2.getMembers(), 1); + + d1a.setAttribute(Attributes.HOSTNAME, "d1a"); + d1b.setAttribute(Attributes.HOSTNAME, "d1b"); + + Thread.sleep(1000); + d2a.setAttribute(Attributes.HOSTNAME, "d2a"); + d2b.setAttribute(Attributes.HOSTNAME, "d2b"); + + Predicate<Set<Entity>> predicate = new Predicate<Set<Entity>>() { + @Override public boolean apply(Set<Entity> input) { + return input != null && input.size() >= 2 && + Sets.intersection(input, ImmutableSet.of(d1a, d1b)).size() == 1 && + Sets.intersection(input, ImmutableSet.of(d2a, d2b)).size() == 1; + } + }; + EntityTestUtils.assertAttributeEventually(fabric, CassandraFabric.CURRENT_SEEDS, predicate); + EntityTestUtils.assertAttributeEventually(d1, CassandraDatacenter.CURRENT_SEEDS, predicate); + EntityTestUtils.assertAttributeEventually(d2, CassandraDatacenter.CURRENT_SEEDS, predicate); + + Set<Entity> seeds = fabric.getAttribute(CassandraFabric.CURRENT_SEEDS); + assertEquals(d1.getAttribute(CassandraDatacenter.CURRENT_SEEDS), seeds); + assertEquals(d2.getAttribute(CassandraDatacenter.CURRENT_SEEDS), seeds); + log.info("Seeds="+seeds); + } finally { + log.info("Failed seeds; fabric="+fabric.getAttribute(CassandraFabric.CURRENT_SEEDS)); + t.interrupt(); + } + } + + + @ImplementedBy(DummyCassandraNodeImpl.class) + public interface DummyCassandraNode extends Entity, Startable, EntityLocal, EntityInternal { + } + + public static class DummyCassandraNodeImpl extends AbstractEntity implements DummyCassandraNode { + + @Override + public void start(Collection<? extends Location> locations) { + ServiceStateLogic.setExpectedState(this, Lifecycle.STARTING); + } + + @Override + public void stop() { + ServiceStateLogic.setExpectedState(this, Lifecycle.STOPPING); + } + + @Override + public void restart() { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeEc2LiveTest.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeEc2LiveTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeEc2LiveTest.java new file mode 100644 index 0000000..495843f --- /dev/null +++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeEc2LiveTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.cassandra; + +import org.apache.brooklyn.entity.nosql.cassandra.CassandraNode; +import org.apache.brooklyn.entity.nosql.cassandra.AstyanaxSupport.AstyanaxSample; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.AbstractEc2LiveTest; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.location.Location; +import brooklyn.test.EntityTestUtils; + +import com.google.common.collect.ImmutableList; + +public class CassandraNodeEc2LiveTest extends AbstractEc2LiveTest { + + private static final Logger log = LoggerFactory.getLogger(CassandraNodeEc2LiveTest.class); + + @Override + protected void doTest(Location loc) throws Exception { + log.info("Testing Cassandra on {}", loc); + + CassandraNode cassandra = app.createAndManageChild(EntitySpec.create(CassandraNode.class) + .configure("thriftPort", "9876+") + .configure("clusterName", "TestCluster")); + app.start(ImmutableList.of(loc)); + + EntityTestUtils.assertAttributeEqualsEventually(cassandra, CassandraNode.SERVICE_UP, true); + + AstyanaxSample astyanax = new AstyanaxSample(cassandra); + astyanax.astyanaxTest(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeIntegrationTest.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeIntegrationTest.java b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeIntegrationTest.java new file mode 100644 index 0000000..b5a657f --- /dev/null +++ b/software/nosql/src/test/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeIntegrationTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.cassandra; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +import java.util.List; +import java.util.Map; + +import org.apache.brooklyn.entity.nosql.cassandra.CassandraNode; +import org.apache.brooklyn.entity.nosql.cassandra.AstyanaxSupport.AstyanaxSample; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.basic.Entities; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.entity.trait.Startable; +import brooklyn.event.basic.PortAttributeSensorAndConfigKey; +import brooklyn.test.Asserts; +import brooklyn.test.EntityTestUtils; +import brooklyn.test.NetworkingTestUtils; +import brooklyn.util.math.MathPredicates; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; + +/** + * Cassandra integration tests. + * + * Test the operation of the {@link CassandraNode} class. + */ +public class CassandraNodeIntegrationTest extends AbstractCassandraNodeTest { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraNodeIntegrationTest.class); + + public static void assertCassandraPortsAvailableEventually() { + Map<String, Integer> ports = getCassandraDefaultPorts(); + NetworkingTestUtils.assertPortsAvailableEventually(ports); + LOG.info("Confirmed Cassandra ports are available: "+ports); + } + + public static Map<String, Integer> getCassandraDefaultPorts() { + List<PortAttributeSensorAndConfigKey> ports = ImmutableList.of( + CassandraNode.GOSSIP_PORT, + CassandraNode.SSL_GOSSIP_PORT, + CassandraNode.THRIFT_PORT, + CassandraNode.NATIVE_TRANSPORT_PORT, + CassandraNode.RMI_REGISTRY_PORT); + Map<String, Integer> result = Maps.newLinkedHashMap(); + for (PortAttributeSensorAndConfigKey key : ports) { + result.put(key.getName(), key.getConfigKey().getDefaultValue().iterator().next()); + } + return result; + } + + @BeforeMethod(alwaysRun = true) + @Override + public void setUp() throws Exception { + assertCassandraPortsAvailableEventually(); + super.setUp(); + } + + @AfterMethod(alwaysRun=true) + @Override + public void tearDown() throws Exception { + super.tearDown(); + assertCassandraPortsAvailableEventually(); + } + + /** + * Test that a node starts and sets SERVICE_UP correctly. + */ + @Test(groups = "Integration") + public void canStartupAndShutdown() { + cassandra = app.createAndManageChild(EntitySpec.create(CassandraNode.class) + .configure("jmxPort", "11099+") + .configure("rmiRegistryPort", "19001+")); + app.start(ImmutableList.of(testLocation)); + + EntityTestUtils.assertAttributeEqualsEventually(cassandra, Startable.SERVICE_UP, true); + Entities.dumpInfo(app); + + cassandra.stop(); + + EntityTestUtils.assertAttributeEqualsEventually(cassandra, Startable.SERVICE_UP, false); + } + + /** + * Test that a keyspace and column family can be created and used with Astyanax client. + */ + @Test(groups = "Integration") + public void testConnection() throws Exception { + cassandra = app.createAndManageChild(EntitySpec.create(CassandraNode.class) + .configure("jmxPort", "11099+") + .configure("rmiRegistryPort", "19001+") + .configure("thriftPort", "9876+")); + app.start(ImmutableList.of(testLocation)); + + EntityTestUtils.assertAttributeEqualsEventually(cassandra, Startable.SERVICE_UP, true); + + AstyanaxSample astyanax = new AstyanaxSample(cassandra); + astyanax.astyanaxTest(); + } + + /** + * Cassandra v2 needs Java >= 1.7. If you have java 6 as the defult locally, then you can use + * something like {@code .configure("shell.env", MutableMap.of("JAVA_HOME", "/Library/Java/JavaVirtualMachines/jdk1.7.0_51.jdk/Contents/Home"))} + */ + @Test(groups = "Integration") + public void testCassandraVersion2() throws Exception { + // TODO In v2.0.10, the bin/cassandra script changed to add an additional check for JMX connectivity. + // This causes cassandera script to hang for us (presumably due to the CLASSPATH/JVM_OPTS we're passing + // in, regarding JMX agent). + // See: + // - https://issues.apache.org/jira/browse/CASSANDRA-7254 + // - https://github.com/apache/cassandra/blame/trunk/bin/cassandra#L211-216 + + String version = "2.0.9"; + String majorMinorVersion = "2.0"; + + cassandra = app.createAndManageChild(EntitySpec.create(CassandraNode.class) + .configure(CassandraNode.SUGGESTED_VERSION, version) + .configure(CassandraNode.NUM_TOKENS_PER_NODE, 256) + .configure("jmxPort", "11099+") + .configure("rmiRegistryPort", "19001+")); + app.start(ImmutableList.of(testLocation)); + + EntityTestUtils.assertAttributeEqualsEventually(cassandra, Startable.SERVICE_UP, true); + Entities.dumpInfo(app); + + AstyanaxSample astyanax = new AstyanaxSample(cassandra); + astyanax.astyanaxTest(); + + assertEquals(cassandra.getMajorMinorVersion(), majorMinorVersion); + + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + assertNotNull(cassandra.getAttribute(CassandraNode.TOKEN)); + assertNotNull(cassandra.getAttribute(CassandraNode.TOKENS)); + assertEquals(cassandra.getAttribute(CassandraNode.TOKENS).size(), 256, "tokens="+cassandra.getAttribute(CassandraNode.TOKENS)); + + assertEquals(cassandra.getAttribute(CassandraNode.PEERS), (Integer)256); + assertEquals(cassandra.getAttribute(CassandraNode.LIVE_NODE_COUNT), (Integer)1); + + assertTrue(cassandra.getAttribute(CassandraNode.SERVICE_UP_JMX)); + assertNotNull(cassandra.getAttribute(CassandraNode.THRIFT_PORT_LATENCY)); + + assertNotNull(cassandra.getAttribute(CassandraNode.READ_PENDING)); + assertNotNull(cassandra.getAttribute(CassandraNode.READ_ACTIVE)); + EntityTestUtils.assertAttribute(cassandra, CassandraNode.READ_COMPLETED, MathPredicates.greaterThanOrEqual(1)); + assertNotNull(cassandra.getAttribute(CassandraNode.WRITE_PENDING)); + assertNotNull(cassandra.getAttribute(CassandraNode.WRITE_ACTIVE)); + EntityTestUtils.assertAttribute(cassandra, CassandraNode.WRITE_COMPLETED, MathPredicates.greaterThanOrEqual(1)); + + assertNotNull(cassandra.getAttribute(CassandraNode.READS_PER_SECOND_LAST)); + assertNotNull(cassandra.getAttribute(CassandraNode.WRITES_PER_SECOND_LAST)); + + assertNotNull(cassandra.getAttribute(CassandraNode.THRIFT_PORT_LATENCY_IN_WINDOW)); + assertNotNull(cassandra.getAttribute(CassandraNode.READS_PER_SECOND_IN_WINDOW)); + assertNotNull(cassandra.getAttribute(CassandraNode.WRITES_PER_SECOND_IN_WINDOW)); + + // an example MXBean + EntityTestUtils.assertAttribute(cassandra, CassandraNode.MAX_HEAP_MEMORY, MathPredicates.greaterThanOrEqual(1)); + }}); + + cassandra.stop(); + + EntityTestUtils.assertAttributeEqualsEventually(cassandra, Startable.SERVICE_UP, false); + } +}
