STORM-1075 add external module storm-cassandra
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/641300e2 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/641300e2 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/641300e2 Branch: refs/heads/master Commit: 641300e2441d732fc2be98f05e70c6e5db4e4cf6 Parents: 437c4b1 Author: Florian Hussonnois <[email protected]> Authored: Wed Oct 28 12:08:58 2015 +0100 Committer: Florian Hussonnois <[email protected]> Committed: Wed Nov 25 09:50:15 2015 +0100 ---------------------------------------------------------------------- external/storm-cassandra/README.md | 180 +++++++++++++++++ external/storm-cassandra/pom.xml | 124 ++++++++++++ .../AbstractExecutionResultHandler.java | 60 ++++++ .../cassandra/BaseExecutionResultHandler.java | 85 ++++++++ .../storm/cassandra/CassandraContext.java | 92 +++++++++ .../cassandra/DynamicStatementBuilder.java | 199 +++++++++++++++++++ .../storm/cassandra/ExecutionResultHandler.java | 98 +++++++++ .../storm/cassandra/Murmur3StreamGrouping.java | 89 +++++++++ .../storm/cassandra/bolt/BaseCassandraBolt.java | 193 ++++++++++++++++++ .../bolt/BatchCassandraWriterBolt.java | 192 ++++++++++++++++++ .../cassandra/bolt/CassandraWriterBolt.java | 72 +++++++ .../cassandra/bolt/GroupingBatchBuilder.java | 68 +++++++ .../bolt/PairBatchStatementTuples.java | 52 +++++ .../cassandra/bolt/PairStatementTuple.java | 50 +++++ .../storm/cassandra/client/CassandraConf.java | 146 ++++++++++++++ .../storm/cassandra/client/ClusterFactory.java | 71 +++++++ .../storm/cassandra/client/SimpleClient.java | 42 ++++ .../cassandra/client/SimpleClientProvider.java | 35 ++++ .../cassandra/client/impl/DefaultClient.java | 123 ++++++++++++ .../cassandra/context/BaseBeanFactory.java | 65 ++++++ .../storm/cassandra/context/BeanFactory.java | 48 +++++ .../storm/cassandra/context/WorkerCtx.java | 89 +++++++++ .../storm/cassandra/executor/AsyncExecutor.java | 152 ++++++++++++++ .../executor/AsyncExecutorProvider.java | 40 ++++ .../cassandra/executor/AsyncResultHandler.java | 64 ++++++ .../executor/ExecutionResultCollector.java | 99 +++++++++ .../executor/impl/BatchAsyncResultHandler.java | 73 +++++++ .../executor/impl/SingleAsyncResultHandler.java | 72 +++++++ .../query/BatchStatementTupleMapper.java | 57 ++++++ .../cassandra/query/CQLClauseTupleMapper.java | 36 ++++ .../cassandra/query/CQLStatementBuilder.java | 31 +++ .../query/CQLStatementTupleMapper.java | 86 ++++++++ .../cassandra/query/CQLTableTupleMapper.java | 39 ++++ .../cassandra/query/CQLValuesTupleMapper.java | 74 +++++++ .../storm/cassandra/query/ContextQuery.java | 83 ++++++++ .../query/SimpleCQLStatementTupleMapper.java | 51 +++++ .../query/impl/BoundStatementMapperBuilder.java | 107 ++++++++++ .../query/impl/InsertStatementBuilder.java | 153 ++++++++++++++ .../query/impl/UpdateStatementBuilder.java | 118 +++++++++++ .../cassandra/query/selector/FieldSelector.java | 68 +++++++ .../cassandra/DynamicStatementBuilderTest.java | 133 +++++++++++++ .../apache/storm/cassandra/WeatherSpout.java | 84 ++++++++ .../storm/cassandra/bolt/BaseTopologyTest.java | 60 ++++++ .../bolt/BatchCassandraWriterBoltTest.java | 62 ++++++ .../cassandra/bolt/CassandraWriterBoltTest.java | 63 ++++++ .../src/test/resources/schema.cql | 7 + pom.xml | 1 + storm-dist/binary/src/main/assembly/binary.xml | 15 +- 48 files changed, 4000 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/README.md ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/README.md b/external/storm-cassandra/README.md new file mode 100644 index 0000000..96454b6 --- /dev/null +++ b/external/storm-cassandra/README.md @@ -0,0 +1,180 @@ +Storm Cassandra Integration (CQL). +------------------- + +[Apache Storm](https://storm.apache.org/) is a free and open source distributed realtime computation system. + +### Bolt API implementation for Apache Cassandra + +This library provides core storm bolt on top of Apache Cassandra. +Provides simple DSL to map storm *Tuple* to Cassandra Query Language *Statement*. + + +### Configuration +The following properties may be passed to storm configuration. + +| **Property name** | **Description** | **Default** | +| ------------------------------------- | ----------------| -------------| +| **cassandra.keyspace** | - | | +| **cassandra.nodes** | - | {"localhost"}| +| **cassandra.username** | - | - | +| **cassandra.password** | - | - | +| **cassandra.port** | - | 9092 | +| **cassandra.output.consistencyLevel** | - | ONE | +| **cassandra.batch.size.rows** | - | 100 | + +### CassandraWriterBolt + +####Static import +```java + +import static org.apache.storm.cassandra.DynamicStatementBuilder.* + +``` + +#### Insert Query Builder +##### Insert query including only the specified tuple fields. +```java + + new CassandraWriterBolt( + insertInto("album") + .values( + with(fields("title", "year", "performer", "genre", "tracks") + ).build()); +``` +##### Insert query including all tuple fields. +```java + + new CassandraWriterBolt( + insertInto("album") + .values(all()).build()); +``` + +##### Insert multiple queries from one input tuple. +```java + + new CassandraWriterBolt( + async( + insertInto("titles_per_album").values(all()), + insertInto("titles_per_performer").values(all()) + ) + ); +``` + +##### Insert query including some fields with aliases +```java + + new CassandraWriterBolt( + insertInto("album") + .values( + with(field("ti"),as("title", + field("ye").as("year")), + field("pe").as("performer")), + field("ge").as("genre")), + field("tr").as("tracks")), + ).build()); +``` + +##### Insert query with static bound query +```java + + new CassandraWriterBolt( + boundQuery("INSERT INTO album (\"title\", \"year\", \"performer\", \"genre\", \"tracks\") VALUES(?, ?, ?, ?, ?);") + .bind(all()); +``` + +##### Insert query with bound statement load from storm configuration +```java + + new CassandraWriterBolt( + boundQuery(named("insertIntoAlbum")) + .bind(all()); +``` + +##### Insert query with bound statement load from tuple field +```java + + new CassandraWriterBolt( + boundQuery(namedByField("cql")) + .bind(all()); +``` + +##### Insert query with batch statement +```java + + // Logged + new CassandraWriterBolt(loggedBatch( + insertInto("title_per_album").values(all()) + insertInto("title_per_performer").values(all()) + ) + ); +// UnLogged + new CassandraWriterBolt(unLoggedBatch( + insertInto("title_per_album").values(all()) + insertInto("title_per_performer").values(all()) + ) + ); +``` + +### How to handle query execution results + +The interface *ExecutionResultHandler* can be used to custom how an execution result should be handle. + +```java +public interface ExecutionResultHandler extends Serializable { + void onQueryValidationException(QueryValidationException e, OutputCollector collector, Tuple tuple); + + void onReadTimeoutException(ReadTimeoutException e, OutputCollector collector, Tuple tuple); + + void onWriteTimeoutException(WriteTimeoutException e, OutputCollector collector, Tuple tuple); + + void onUnavailableException(UnavailableException e, OutputCollector collector, Tuple tuple); + + void onQuerySuccess(OutputCollector collector, Tuple tuple); +} +``` + +By default, the CassandraBolt fails a tuple on all Cassandra Exception (see [BaseExecutionResultHandler](https://github.com/apache/storm/tree/master/external/storm-cassandra/blob/master/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java)) . + +```java + new CassandraWriterBolt(insertInto("album").values(with(all()).build()) + .withResultHandler(new MyCustomResultHandler()); +``` + +### Declare Output fields + +A CassandraBolt can declare output fields / stream output fields. +For instance, this may be used to remit a new tuple on error, or to chain queries. + +```java + new CassandraWriterBolt(insertInto("album").values(withFields(all()).build()) + .withResultHandler(new EmitOnDriverExceptionResultHandler()); + .withStreamOutputFields("stream_error", new Fields("message"); + + public static class EmitOnDriverExceptionResultHandler extends BaseExecutionResultHandler { + @Override + protected void onDriverException(DriverException e, OutputCollector collector, Tuple tuple) { + LOG.error("An error occurred while executing cassandra statement", e); + collector.emit("stream_error", new Values(e.getMessage())); + collector.ack(tuple); + } + } +``` + +## License + +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/pom.xml b/external/storm-cassandra/pom.xml new file mode 100644 index 0000000..86cd87d --- /dev/null +++ b/external/storm-cassandra/pom.xml @@ -0,0 +1,124 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>storm</artifactId> + <groupId>org.apache.storm</groupId> + <version>0.11.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <packaging>jar</packaging> + <artifactId>storm-cassandra</artifactId> + <name>storm-cassandra</name> + <description>Storm Bolts for Apache Cassandra</description> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <java.version>1.7</java.version> + <org.slf4j.version>1.7.6</org.slf4j.version> + <jackson.databind.version>2.3.2</jackson.databind.version> + <junit.version>4.11</junit.version> + <guava.version>16.0.1</guava.version> + <commons-lang3.version>3.3</commons-lang3.version> + <cassandra.driver.core.version>2.1.7.1</cassandra.driver.core.version> + </properties> + + <developers> + <developer> + <id>fhuss</id> + <name>Florian Hussonnois</name> + <email>[email protected]</email> + <url>https://github.com/fhussonnois</url> + <roles> + <role>developer</role> + </roles> + </developer> + </developers> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.1</version> + <configuration> + <source>${java.version}</source> + <target>${java.version}</target> + </configuration> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>com.datastax.cassandra</groupId> + <artifactId>cassandra-driver-core</artifactId> + <version>${cassandra.driver.core.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${org.slf4j.version}</version> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>${commons-lang3.version}</version> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>1.10.19</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.cassandraunit</groupId> + <artifactId>cassandra-unit</artifactId> + <version>2.1.3.1</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/AbstractExecutionResultHandler.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/AbstractExecutionResultHandler.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/AbstractExecutionResultHandler.java new file mode 100644 index 0000000..80ae284 --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/AbstractExecutionResultHandler.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra; + +import backtype.storm.task.OutputCollector; +import backtype.storm.tuple.Tuple; +import com.datastax.driver.core.exceptions.QueryValidationException; +import com.datastax.driver.core.exceptions.ReadTimeoutException; +import com.datastax.driver.core.exceptions.UnavailableException; +import com.datastax.driver.core.exceptions.WriteTimeoutException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * Default interface to define strategies to apply when a query is either succeed or failed. + * + */ +public abstract class AbstractExecutionResultHandler implements ExecutionResultHandler { + + public static final Logger LOG = LoggerFactory.getLogger(AbstractExecutionResultHandler.class); + + @Override + public void onThrowable(Throwable t, OutputCollector collector, Tuple i) { + if( t instanceof QueryValidationException) { + this.onQueryValidationException((QueryValidationException)t, collector, i); + } else if (t instanceof ReadTimeoutException) { + this.onReadTimeoutException((ReadTimeoutException)t, collector, i); + } else if (t instanceof WriteTimeoutException) { + this.onWriteTimeoutException((WriteTimeoutException) t, collector, i); + } else if (t instanceof UnavailableException) { + this.onUnavailableException((UnavailableException) t, collector, i); + } else { + collector.reportError(t); + collector.fail(i); + } + } + + @Override + public void onThrowable(Throwable t, OutputCollector collector, List<Tuple> tl) { + for(Tuple i : tl) onThrowable(t, collector, i); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java new file mode 100644 index 0000000..c7fc4f1 --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra; + +import backtype.storm.task.OutputCollector; +import backtype.storm.tuple.Tuple; +import com.datastax.driver.core.exceptions.*; +import org.slf4j.LoggerFactory; + +/** + * Simple {@link ExecutionResultHandler} which fail the incoming tuple when an {@link com.datastax.driver.core.exceptions.DriverException} is thrown. + * The exception is then automatically report to storm. + * + */ +public class BaseExecutionResultHandler extends AbstractExecutionResultHandler { + + private final static org.slf4j.Logger LOG = LoggerFactory.getLogger(BaseExecutionResultHandler.class); + + /** + * {@inheritDoc} + */ + @Override + public void onQueryValidationException(QueryValidationException e, OutputCollector collector, Tuple tuple) { + onDriverException(e, collector, tuple); + } + /** + * {@inheritDoc} + */ + @Override + public void onReadTimeoutException(ReadTimeoutException e, OutputCollector collector, Tuple tuple) { + onDriverException(e, collector, tuple); + } + /** + * {@inheritDoc} + */ + @Override + public void onWriteTimeoutException(WriteTimeoutException e, OutputCollector collector, Tuple tuple) { + onDriverException(e, collector, tuple); + } + /** + * {@inheritDoc} + */ + @Override + public void onUnavailableException(UnavailableException e, OutputCollector collector, Tuple tuple) { + onDriverException(e, collector, tuple); + } + /** + * {@inheritDoc} + */ + @Override + public void onQuerySuccess(OutputCollector collector, Tuple tuple) { + + } + + /** + * This method is called when an one of the methods of the {@link BaseExecutionResultHandler} is not + * overridden. It can be practical if you want to bundle some/all of the methods to a single method. + * + * @param e the exception throws + * @param collector the output collector + * @param tuple the tuple in failure + */ + protected void onDriverException(DriverException e, OutputCollector collector, Tuple tuple) { + LOG.error("An error occurred while executing cassandra statement", e); + collector.fail(tuple); + collector.reportError(e); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/CassandraContext.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/CassandraContext.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/CassandraContext.java new file mode 100644 index 0000000..3081b0d --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/CassandraContext.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra; + +import com.datastax.driver.core.Cluster; +import org.apache.storm.cassandra.client.CassandraConf; +import org.apache.storm.cassandra.client.ClusterFactory; +import org.apache.storm.cassandra.client.SimpleClient; +import org.apache.storm.cassandra.client.SimpleClientProvider; +import org.apache.storm.cassandra.client.impl.DefaultClient; +import org.apache.storm.cassandra.context.BaseBeanFactory; +import org.apache.storm.cassandra.context.WorkerCtx; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * + */ +public class CassandraContext extends WorkerCtx implements SimpleClientProvider { + + /** + * Creates a new {@link CassandraContext} instance. + */ + public CassandraContext() { + register(SimpleClient.class, new ClientFactory()); + register(CassandraConf.class, new CassandraConfFactory()); + register(Cluster.class, new ClusterFactory()); + } + + /** + * {@inheritDoc} + */ + @Override + public SimpleClient getClient(Map<String, Object> config) { + SimpleClient client = getWorkerBean(SimpleClient.class, config); + if (client.isClose() ) + client = getWorkerBean(SimpleClient.class, config, true); + return client; + } + + /** + * Simple class to make {@link CassandraConf} from a Storm topology configuration. + */ + public static final class CassandraConfFactory extends BaseBeanFactory<CassandraConf> { + /** + * {@inheritDoc} + */ + @Override + protected CassandraConf make(Map<String, Object> stormConf) { + return new CassandraConf(stormConf); + } + } + + /** + * Simple class to make {@link ClientFactory} from a Storm topology configuration. + */ + public static final class ClientFactory extends BaseBeanFactory<SimpleClient> { + + private static final Logger LOG = LoggerFactory.getLogger(ClientFactory.class); + /** + * {@inheritDoc} + */ + @Override + protected SimpleClient make(Map<String, Object> stormConf) { + Cluster cluster = this.context.getWorkerBean(Cluster.class, stormConf); + if( cluster.isClosed() ) { + LOG.warn("Cluster is closed - trigger new initialization!"); + cluster = this.context.getWorkerBean(Cluster.class, stormConf, true); + } + CassandraConf config = this.context.getWorkerBean(CassandraConf.class, stormConf); + return new DefaultClient(cluster, config.getKeyspace()); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java new file mode 100644 index 0000000..deea8da --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra; + +import com.datastax.driver.core.BatchStatement; +import org.apache.storm.cassandra.query.*; +import org.apache.storm.cassandra.query.impl.BoundStatementMapperBuilder; +import org.apache.storm.cassandra.query.impl.InsertStatementBuilder; +import org.apache.storm.cassandra.query.impl.UpdateStatementBuilder; +import org.apache.storm.cassandra.query.selector.FieldSelector; + +import java.io.Serializable; +import java.util.*; + +public class DynamicStatementBuilder implements Serializable { + + private DynamicStatementBuilder() { + } + + /** + * Builds a new insert statement for the specified table. + * + * @param table the table's name. + * @return a new {@link InsertStatementBuilder} instance. + */ + public static final InsertStatementBuilder insertInto(String table) { + return new InsertStatementBuilder(table); + } + /** + * Builds a new insert statement based on the specified CQL mapper. + * + * @param mapper the CQL mapper. + * @return a new {@link InsertStatementBuilder} instance. + */ + public static final InsertStatementBuilder insertInto(CQLTableTupleMapper mapper) { + return new InsertStatementBuilder(mapper); + } + /** + * Builds a new insert statement for the specified keyspace and table. + * + * @param ks the keyspace to use. + * @param table the table's name. + * @return a new {@link InsertStatementBuilder} instance. + */ + public static final InsertStatementBuilder insertInto(String ks, String table) { + return new InsertStatementBuilder(table, ks); + } + + /** + * Builds a new update statement for the specified table. + * + * @param table the table's name. + * @return a new {@link UpdateStatementBuilder} instance. + */ + public static final UpdateStatementBuilder update(String table) { + return new UpdateStatementBuilder(table); + } + + /** + * Builds a new update statement for the specified keyspace and table. + * + * @param table the table's name. + * @return a new {@link UpdateStatementBuilder} instance. + */ + public static final UpdateStatementBuilder update(String ks, String table) { + return new UpdateStatementBuilder(table, ks); + } + + /** + * Builds a new bound statement based on the specified query. + * + * @param cql the query. + * @return a new {@link BoundStatementMapperBuilder} instance. + */ + public static final BoundStatementMapperBuilder boundQuery(String cql) { + return new BoundStatementMapperBuilder(cql); + } + + /** + * Builds a new bound statement identified by the given field. + * + * @param field a context used to resolve the cassandra query. + * @return a new {@link BoundStatementMapperBuilder} instance. + */ + public static final BoundStatementMapperBuilder boundQuery(ContextQuery field) { + return new BoundStatementMapperBuilder(field); + } + + /** + * Builds multiple statements which will be executed asynchronously. + * + * @param builders a list of {@link CQLStatementBuilder}. + * @return a new {@link CQLStatementTupleMapper}. + */ + public static final CQLStatementTupleMapper async(final CQLStatementBuilder... builders) { + return new CQLStatementTupleMapper.DynamicCQLStatementTupleMapper(Arrays.asList(builders)); + } + + /** + * Creates a new {@link com.datastax.driver.core.BatchStatement.Type#LOGGED} batch statement for the specified CQL statement builders. + */ + public static final BatchStatementTupleMapper loggedBatch(CQLStatementBuilder... builders) { + return newBatchStatementBuilder(BatchStatement.Type.LOGGED, builders); + } + /** + * Creates a new {@link com.datastax.driver.core.BatchStatement.Type#COUNTER} batch statement for the specified CQL statement builders. + */ + public static final BatchStatementTupleMapper counterBatch(CQLStatementBuilder... builders) { + return newBatchStatementBuilder(BatchStatement.Type.COUNTER, builders); + } + /** + * Creates a new {@link com.datastax.driver.core.BatchStatement.Type#UNLOGGED} batch statement for the specified CQL statement builders. + */ + public static final BatchStatementTupleMapper unLoggedBatch(CQLStatementBuilder... builders) { + return newBatchStatementBuilder(BatchStatement.Type.UNLOGGED, builders); + } + + private static BatchStatementTupleMapper newBatchStatementBuilder(BatchStatement.Type type, CQLStatementBuilder[] builders) { + List<CQLStatementTupleMapper> mappers = new ArrayList<>(builders.length); + for(CQLStatementBuilder b : Arrays.asList(builders)) + mappers.add(b.build()); + return new BatchStatementTupleMapper(type, mappers); + } + + /** + * Retrieves from the storm configuration the specified named query. + * + * @param name query's name. + */ + public static final ContextQuery named(final String name) { + return new ContextQuery.BoundQueryContext(name); + } + + /** + * Retrieves from the storm configuration the named query specified by a tuple field. + * + * @param fieldName field's name that contains the named of the query. + */ + public static final ContextQuery namedByField(final String fieldName) { + return new ContextQuery.BoundQueryNamedByFieldContext(fieldName); + } + + + /** + * Maps a CQL value to the specified field from an input tuple. + * + * @param name the name of a tuple field. + * @return a new {@link FieldSelector}. + */ + public static final FieldSelector field(final String name) { + return new FieldSelector(name); + } + + /** + * Maps CQL values to all specified fields from an input tuple. + * + * @param fields a list of tuple fields + * @return a list of {@link FieldSelector}. + */ + public static final FieldSelector[] fields(final String... fields) { + int size = fields.length; + List<FieldSelector> fl = new ArrayList<>(size); + for(int i = 0 ; i < size; i++) + fl.add(new FieldSelector(fields[i])); + return fl.toArray(new FieldSelector[size]); + } + + /** + * Includes only the specified tuple fields. + * + * @param fields a list of field selector. + */ + public static final CQLValuesTupleMapper with(final FieldSelector... fields) { + return new CQLValuesTupleMapper.WithFieldTupleMapper(Arrays.asList(fields)); + } + + /** + * Includes all tuple fields. + */ + public static final CQLValuesTupleMapper all() { + return new CQLValuesTupleMapper.AllTupleMapper(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/ExecutionResultHandler.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/ExecutionResultHandler.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/ExecutionResultHandler.java new file mode 100644 index 0000000..b804ee5 --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/ExecutionResultHandler.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra; + +import backtype.storm.task.OutputCollector; +import backtype.storm.tuple.Tuple; +import com.datastax.driver.core.exceptions.QueryValidationException; +import com.datastax.driver.core.exceptions.ReadTimeoutException; +import com.datastax.driver.core.exceptions.UnavailableException; +import com.datastax.driver.core.exceptions.WriteTimeoutException; + +import java.io.Serializable; +import java.util.List; + +/** + * Default interface to define strategies to apply when a query is either succeed or failed. + * + */ +public interface ExecutionResultHandler extends Serializable { + + /** + * Invoked when a {@link com.datastax.driver.core.exceptions.QueryValidationException} is thrown. + * + * @param e the cassandra exception. + * @param collector the storm collector. + * @param tuple an input tuple. + */ + void onQueryValidationException(QueryValidationException e, OutputCollector collector, Tuple tuple); + /** + * Invoked when a {@link com.datastax.driver.core.exceptions.ReadTimeoutException} is thrown. + * + * @param e the cassandra exception. + * @param collector the storm collector. + * @param tuple an input tuple. + */ + void onReadTimeoutException(ReadTimeoutException e, OutputCollector collector, Tuple tuple); + + /** + * Invoked when a {@link com.datastax.driver.core.exceptions.WriteTimeoutException} is thrown. + * + * @param e the cassandra exception. + * @param collector the storm collector. + * @param tuple an input tuple. + */ + void onWriteTimeoutException(WriteTimeoutException e, OutputCollector collector, Tuple tuple); + /** + * Invoked when a {@link com.datastax.driver.core.exceptions.UnavailableException} is thrown. + * + * + * @param e the cassandra exception. + * @param collector the storm collector. + * @param tuple an input tuple. + */ + void onUnavailableException(UnavailableException e, OutputCollector collector, Tuple tuple); + /** + * Invoked when a query is executed with success. + * This method is NOT responsible for acknowledging input tuple. + * + * @param collector the storm collector. + * @param tuple an input tuple. + */ + void onQuerySuccess(OutputCollector collector, Tuple tuple); + + /** + * Default method used to handle any type of exception. + * + * @param t the thrown exception + * @param collector the storm collector. + * @param i an input tuple. + */ + void onThrowable(Throwable t, OutputCollector collector, Tuple i); + + /** + * Default method used to handle any type of exception. + * + * @param t the thrown exception + * @param collector the storm collector. + * @param tl a list of input tuple. + */ + void onThrowable(Throwable t, OutputCollector collector, List<Tuple> tl); + +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java new file mode 100644 index 0000000..a3f6887 --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra; + +import backtype.storm.generated.GlobalStreamId; +import backtype.storm.grouping.CustomStreamGrouping; +import backtype.storm.task.WorkerTopologyContext; +import backtype.storm.topology.FailedException; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.hash.Hashing; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.List; + +/** + * + * Simple {@link backtype.storm.grouping.CustomStreamGrouping} that uses Murmur3 algorithm to choose the target task of a tuple. + * + * This stream grouping may be used to optimise writes to Apache Cassandra. + */ +public class Murmur3StreamGrouping implements CustomStreamGrouping { + + private List<Integer> targetTasks; + + /** + * {@inheritDoc} + */ + @Override + public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) { + this.targetTasks = targetTasks; + } + + /** + * {@inheritDoc} + */ + @Override + public List<Integer> chooseTasks(int taskId, List<Object> values) { + try { + int n = Math.abs( (int) hashes(values) % targetTasks.size() ); + return Lists.newArrayList(targetTasks.get(n)); + } catch (IOException e) { + throw new FailedException(e); + } + } + + /** + * Computes the murmur3 hash for the specified values. + * http://stackoverflow.com/questions/27212797/cassandra-hashing-algorithm-with-composite-keys + * https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/db/marshal/CompositeType.java + * + * @param values the fields which are part of the (compose) partition key. + * @return the computed hash for input values. + * @throws java.io.IOException + */ + @VisibleForTesting + public static long hashes(List<Object> values) throws IOException { + byte[] keyBytes; + try(ByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream out = new DataOutputStream(bos)) { + for(Object key : values) { + byte[] arr = ((String)key).getBytes("UTF-8"); + out.writeShort(arr.length); + out.write(arr, 0, arr.length); + out.writeByte(0); + } + out.flush(); + keyBytes = bos.toByteArray(); + } + return Hashing.murmur3_128().hashBytes(keyBytes).asLong(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java new file mode 100644 index 0000000..7211ad3 --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.bolt; + +import backtype.storm.Config; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.utils.TupleUtils; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import org.apache.storm.cassandra.BaseExecutionResultHandler; +import org.apache.storm.cassandra.CassandraContext; +import org.apache.storm.cassandra.ExecutionResultHandler; +import org.apache.storm.cassandra.client.CassandraConf; +import org.apache.storm.cassandra.client.SimpleClient; +import org.apache.storm.cassandra.client.SimpleClientProvider; +import org.apache.storm.cassandra.executor.AsyncExecutor; +import org.apache.storm.cassandra.executor.AsyncExecutorProvider; +import org.apache.storm.cassandra.executor.AsyncResultHandler; +import org.apache.storm.cassandra.query.CQLStatementTupleMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * A base cassandra bolt. + * + * Default {@link backtype.storm.topology.base.BaseRichBolt} + */ +public abstract class BaseCassandraBolt<T> extends BaseRichBolt { + + private static final Logger LOG = LoggerFactory.getLogger(BaseCassandraBolt.class); + + protected OutputCollector outputCollector; + + protected SimpleClientProvider clientProvider; + protected SimpleClient client; + protected Session session; + protected Map stormConfig; + + protected CassandraConf cassandraConfConfig; + + private CQLStatementTupleMapper mapper; + private ExecutionResultHandler resultHandler; + + transient private Map<String, Fields> outputsFields = new HashMap<>(); + + /** + * Creates a new {@link CassandraWriterBolt} instance. + * @param mapper + */ + public BaseCassandraBolt(CQLStatementTupleMapper mapper, SimpleClientProvider clientProvider) { + this.mapper = mapper; + this.clientProvider = clientProvider; + } + /** + * Creates a new {@link CassandraWriterBolt} instance. + * @param tupleMapper + */ + public BaseCassandraBolt(CQLStatementTupleMapper tupleMapper) { + this(tupleMapper, new CassandraContext()); + } + + /** + * {@inheritDoc} + */ + @Override + public void prepare(Map stormConfig, TopologyContext topologyContext, OutputCollector outputCollector) { + this.outputCollector = outputCollector; + this.stormConfig = stormConfig; + this.cassandraConfConfig = new CassandraConf(stormConfig); + this.client = clientProvider.getClient(this.stormConfig); + try { + session = client.connect(); + } catch (NoHostAvailableException e) { + outputCollector.reportError(e); + } + } + + public BaseCassandraBolt withResultHandler(ExecutionResultHandler resultHandler) { + this.resultHandler = resultHandler; + return this; + } + + public BaseCassandraBolt withOutputFields(Fields fields) { + this.outputsFields.put(null, fields); + return this; + } + + public BaseCassandraBolt withStreamOutputFields(String stream, Fields fields) { + if( stream == null || stream.length() == 0) throw new IllegalArgumentException("'stream' should not be null"); + this.outputsFields.put(stream, fields); + return this; + } + + protected ExecutionResultHandler getResultHandler() { + if(resultHandler == null) resultHandler = new BaseExecutionResultHandler(); + return resultHandler; + } + + protected CQLStatementTupleMapper getMapper() { + return mapper; + } + + abstract protected AsyncResultHandler<T> getAsyncHandler() ; + + protected AsyncExecutor<T> getAsyncExecutor() { + return AsyncExecutorProvider.getLocal(session, getAsyncHandler()); + } + + /** + * {@inheritDoc} + * + * @param input the tuple to process. + */ + @Override + public final void execute(Tuple input) { + getAsyncHandler().flush(outputCollector); + if (TupleUtils.isTick(input)) { + tick(); + outputCollector.ack(input); + } else { + process(input); + } + } + + /** + * Process a single tuple of input. + * + * @param input The input tuple to be processed. + */ + abstract protected void process(Tuple input); + + /** + * Calls by an input tick tuple. + */ + abstract protected void tick(); + + /** + * {@inheritDoc} + */ + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + Fields fields = this.outputsFields.remove(null); + if( fields != null) declarer.declare(fields); + for(Map.Entry<String, Fields> entry : this.outputsFields.entrySet()) { + declarer.declareStream(entry.getKey(), entry.getValue()); + } + } + + /** + * {@inheritDoc} + */ + @Override + public Map<String, Object> getComponentConfiguration() { + Config conf = new Config(); + // add tick tuple each second to force acknowledgement of pending tuples. + conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 1); + return conf; + } + + /** + * {@inheritDoc} + */ + @Override + public void cleanup() { + getAsyncExecutor().shutdown(); + getAsyncHandler().flush(outputCollector); + client.close(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java new file mode 100644 index 0000000..c4c0110 --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.bolt; + +import backtype.storm.Config; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.Tuple; +import backtype.storm.utils.Time; +import com.datastax.driver.core.Statement; +import org.apache.storm.cassandra.executor.AsyncResultHandler; +import org.apache.storm.cassandra.executor.impl.BatchAsyncResultHandler; +import org.apache.storm.cassandra.query.CQLStatementTupleMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> { + + private final static Logger LOG = LoggerFactory.getLogger(BatchCassandraWriterBolt.class); + + public static final int DEFAULT_EMIT_FREQUENCY = 2; + + private static final int QUEUE_MAX_SIZE = 1000; + + private LinkedBlockingQueue<Tuple> queue; + + private int tickFrequencyInSeconds; + + private long lastModifiedTimesMillis; + + private String componentID; + + private AsyncResultHandler<List<Tuple>> asyncResultHandler; + + /** + * Creates a new {@link CassandraWriterBolt} instance. + * + * @param tupleMapper + */ + public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper) { + this(tupleMapper, DEFAULT_EMIT_FREQUENCY); + } + + /** + * Creates a new {@link CassandraWriterBolt} instance. + * + * @param tupleMapper + */ + public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper, int tickFrequencyInSeconds) { + super(tupleMapper); + this.tickFrequencyInSeconds = tickFrequencyInSeconds; + } + /** + * {@inheritDoc} + */ + @Override + public void prepare(Map stormConfig, TopologyContext topologyContext, OutputCollector outputCollector) { + super.prepare(stormConfig, topologyContext, outputCollector); + this.componentID = topologyContext.getThisComponentId(); + this.queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE); + this.lastModifiedTimesMillis = now(); + } + + @Override + protected AsyncResultHandler<List<Tuple>> getAsyncHandler() { + if( asyncResultHandler == null) { + asyncResultHandler = new BatchAsyncResultHandler(getResultHandler()); + } + return asyncResultHandler; + } + + /** + * {@inheritDoc} + */ + @Override + protected void process(Tuple input) { + if( ! queue.offer(input) ) { + LOG.info(logPrefix() + "The message queue is full, preparing batch statement..."); + prepareAndExecuteStatement(); + queue.add(input); + } + } + /** + * {@inheritDoc} + */ + @Override + protected void tick() { + prepareAndExecuteStatement(); + } + + public void prepareAndExecuteStatement() { + int size = queue.size(); + if( size > 0 ) { + List<Tuple> inputs = new ArrayList<>(size); + queue.drainTo(inputs); + try { + List<PairStatementTuple> psl = buildStatement(inputs); + + int sinceLastModified = updateAndGetSecondsSinceLastModified(); + LOG.debug(logPrefix() + String.format("Execute cql batches with %s statements after %s seconds", size, sinceLastModified)); + + checkTimeElapsedSinceLastExec(sinceLastModified); + + GroupingBatchBuilder batchBuilder = new GroupingBatchBuilder(cassandraConfConfig.getBatchSizeRows(), psl); + + int batchSize = 0; + for (PairBatchStatementTuples batch : batchBuilder) { + LOG.debug(logPrefix() + String.format("Writing data to %s in batches of %s rows.", cassandraConfConfig.getKeyspace(), batch.getInputs().size())); + getAsyncExecutor().execAsync(batch.getStatement(), batch.getInputs()); + batchSize++; + } + + int pending = getAsyncExecutor().getPendingExec(); + if (pending > batchSize) { + LOG.warn( logPrefix() + String.format("Currently pending tasks is superior to the number of submit batches(%s) : %s", batchSize, pending)); + } + + } catch (Throwable r) { + LOG.error(logPrefix() + "Error(s) occurred while preparing batch statements"); + getAsyncHandler().failure(r, inputs); + } + } + } + + private List<PairStatementTuple> buildStatement(List<Tuple> inputs) { + List<PairStatementTuple> stmts = new ArrayList<>(inputs.size()); + + for(Tuple t : inputs) { + List<Statement> sl = getMapper().map(stormConfig, session, t); + for(Statement s : sl) + stmts.add(new PairStatementTuple(t, s) ); + } + return stmts; + } + + private void checkTimeElapsedSinceLastExec(int sinceLastModified) { + if(sinceLastModified > tickFrequencyInSeconds) + LOG.warn( logPrefix() + String.format("The time elapsed since last execution exceeded tick tuple frequency - %d > %d seconds", sinceLastModified, tickFrequencyInSeconds)); + } + + private String logPrefix() { + return componentID + " - "; + } + + public BatchCassandraWriterBolt withTickFrequency(long time, TimeUnit unit) { + this.tickFrequencyInSeconds = (int)unit.toSeconds(time); + return this; + } + + /** + * {@inheritDoc} + */ + @Override + public Map<String, Object> getComponentConfiguration() { + Config conf = new Config(); + conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFrequencyInSeconds); + return conf; + } + + private int updateAndGetSecondsSinceLastModified() { + long now = now(); + int seconds = (int) (now - lastModifiedTimesMillis) / 1000; + lastModifiedTimesMillis = now; + return seconds; + } + + private long now() { + return Time.currentTimeMillis(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java new file mode 100644 index 0000000..663f26a --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.bolt; + +import backtype.storm.tuple.Tuple; +import com.datastax.driver.core.Statement; +import org.apache.storm.cassandra.executor.AsyncResultHandler; +import org.apache.storm.cassandra.executor.impl.SingleAsyncResultHandler; +import org.apache.storm.cassandra.query.CQLStatementTupleMapper; + +import java.util.List; + +public class CassandraWriterBolt extends BaseCassandraBolt<Tuple> { + + private AsyncResultHandler<Tuple> asyncResultHandler; + + /** + * Creates a new {@link CassandraWriterBolt} instance. + * + * @param tupleMapper + */ + public CassandraWriterBolt(CQLStatementTupleMapper tupleMapper) { + super(tupleMapper); + } + + /** + * {@inheritDoc} + */ + @Override + protected AsyncResultHandler<Tuple> getAsyncHandler() { + if( asyncResultHandler == null) { + asyncResultHandler = new SingleAsyncResultHandler(getResultHandler()); + } + return asyncResultHandler; + } + + /** + * {@inheritDoc} + */ + @Override + protected void process(Tuple input) { + List<Statement> statements = getMapper().map(stormConfig, session, input); + if (statements.size() == 1) getAsyncExecutor().execAsync(statements.get(0), input); + else getAsyncExecutor().execAsync(statements, input); + } + /** + * {@inheritDoc} + */ + @Override + protected void tick() { + /** do nothing **/ + } +} + + + http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/GroupingBatchBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/GroupingBatchBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/GroupingBatchBuilder.java new file mode 100644 index 0000000..ea63b3d --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/GroupingBatchBuilder.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.bolt; + +import backtype.storm.tuple.Tuple; +import com.datastax.driver.core.BatchStatement; +import com.google.common.base.Function; +import com.google.common.collect.Iterables; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +/** + * + */ +public class GroupingBatchBuilder implements Iterable<PairBatchStatementTuples> { + + private int batchSizeRows; + + private List<PairStatementTuple> statements; + + /** + * Creates a new {@link GroupingBatchBuilder} instance. + * @param batchSizeRows + */ + public GroupingBatchBuilder(int batchSizeRows, List<PairStatementTuple> statements) { + this.batchSizeRows = batchSizeRows; + this.statements = statements; + } + + @Override + public Iterator<PairBatchStatementTuples> iterator() { + return build().iterator(); + } + + private Iterable<PairBatchStatementTuples> build( ) { + Iterable<List<PairStatementTuple>> partition = Iterables.partition(statements, batchSizeRows); + return Iterables.transform(partition, new Function<List<PairStatementTuple>, PairBatchStatementTuples>() { + @Override + public PairBatchStatementTuples apply(List<PairStatementTuple> l) { + final List<Tuple> inputs = new LinkedList<>(); + final BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED); + for (PairStatementTuple pair : l) { + batch.add(pair.getStatement()); + inputs.add(pair.getTuple()); + } + return new PairBatchStatementTuples(inputs, batch); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairBatchStatementTuples.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairBatchStatementTuples.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairBatchStatementTuples.java new file mode 100644 index 0000000..736c482 --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairBatchStatementTuples.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.bolt; + +import backtype.storm.tuple.Tuple; +import com.datastax.driver.core.BatchStatement; + +import java.util.List; + +/** + * Simple class to pair a list of tuples with a single batch statement. + */ +public class PairBatchStatementTuples { + + private final List<Tuple> inputs; + + private final BatchStatement statement; + + /** + * Creates a new {@link PairBatchStatementTuples} instance. + * @param inputs List of inputs attached to this batch. + * @param statement The batch statement. + */ + public PairBatchStatementTuples(List<Tuple> inputs, BatchStatement statement) { + this.inputs = inputs; + this.statement = statement; + } + + public List<Tuple> getInputs() { + return inputs; + } + + public BatchStatement getStatement() { + return statement; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairStatementTuple.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairStatementTuple.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairStatementTuple.java new file mode 100644 index 0000000..8f50574 --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairStatementTuple.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.bolt; + +import backtype.storm.tuple.Tuple; +import com.datastax.driver.core.Statement; + +/** + * Simple class to pair a tuple with a statement. + */ +public class PairStatementTuple { + + private final Tuple tuple; + + private final Statement statement; + + /** + * Creates a new {@link PairStatementTuple} instance. + * @param tuple + * @param statement + */ + public PairStatementTuple(Tuple tuple, Statement statement) { + this.tuple = tuple; + this.statement = statement; + } + + public Tuple getTuple() { + return tuple; + } + + public Statement getStatement() { + return statement; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java new file mode 100644 index 0000000..ccee468 --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.client; + +import com.datastax.driver.core.ConsistencyLevel; +import com.google.common.base.Objects; + +import java.io.Serializable; +import java.util.Map; + +/** + * Configuration used by cassandra storm components. + */ +public class CassandraConf implements Serializable { + + public static final String CASSANDRA_USERNAME = "cassandra.username"; + public static final String CASSANDRA_PASSWORD = "cassandra.password"; + public static final String CASSANDRA_KEYSPACE = "cassandra.keyspace"; + public static final String CASSANDRA_CONSISTENCY_LEVEL = "cassandra.output.consistencyLevel"; + public static final String CASSANDRA_NODES = "cassandra.nodes"; + public static final String CASSANDRA_PORT = "cassandra.port"; + public static final String CASSANDRA_BATCH_SIZE_ROWS = "cassandra.batch.size.rows"; + + /** + * The authorized cassandra username. + */ + private String username; + /** + * The authorized cassandra password + */ + private String password; + /** + * The cassandra keyspace. + */ + private String keyspace; + /** + * List of contacts nodes. + */ + private String[] nodes = {"localhost"}; + + /** + * The port used to connect to nodes. + */ + private int port = 9092; + + /** + * Consistency level used to write statements. + */ + private ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE; + /** + * The maximal numbers of rows per batch. + */ + private int batchSizeRows = 100; + + /** + * Creates a new {@link CassandraConf} instance. + */ + public CassandraConf() { + super(); + } + + /** + * Creates a new {@link CassandraConf} instance. + * + * @param conf The storm configuration. + */ + public CassandraConf(Map<String, Object> conf) { + this.username = getOrElse(conf, CASSANDRA_USERNAME, null); + this.password = getOrElse(conf, CASSANDRA_PASSWORD, null); + this.keyspace = get(conf, CASSANDRA_KEYSPACE); + this.consistencyLevel = ConsistencyLevel.valueOf(getOrElse(conf, CASSANDRA_CONSISTENCY_LEVEL, ConsistencyLevel.ONE.name())); + this.nodes = getOrElse(conf, CASSANDRA_NODES, "localhost").split(","); + this.batchSizeRows = getOrElse(conf, CASSANDRA_BATCH_SIZE_ROWS, 100); + this.port = conf.get(CASSANDRA_PORT) != null ? Integer.valueOf((String)conf.get(CASSANDRA_PORT)) : 9042; + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } + + public String getKeyspace() { + return keyspace; + } + + public String[] getNodes() { + return nodes; + } + + public ConsistencyLevel getConsistencyLevel() { + return consistencyLevel; + } + + public int getBatchSizeRows() { + return batchSizeRows; + } + + public int getPort() { + return this.port; + } + + private <T> T get(Map<String, Object> conf, String key) { + Object o = conf.get(key); + if(o == null) { + throw new IllegalArgumentException("No '" + key + "' value found in configuration!"); + } + return (T)o; + } + + private <T> T getOrElse(Map<String, Object> conf, String key, T def) { + T o = (T) conf.get(key); + return (o == null) ? def : o; + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("username", username) + .add("password", password) + .add("keyspace", keyspace) + .add("nodes", nodes) + .add("port", port) + .add("consistencyLevel", consistencyLevel) + .add("batchSizeRows", batchSizeRows) + .toString(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java new file mode 100644 index 0000000..886f6d3 --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.client; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PlainTextAuthProvider; +import com.datastax.driver.core.QueryOptions; +import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy; +import com.datastax.driver.core.policies.ExponentialReconnectionPolicy; +import com.datastax.driver.core.policies.RoundRobinPolicy; +import com.datastax.driver.core.policies.TokenAwarePolicy; +import org.apache.commons.lang3.StringUtils; +import org.apache.storm.cassandra.context.BaseBeanFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Default interface to build cassandra Cluster from the a Storm Topology configuration. + */ +public class ClusterFactory extends BaseBeanFactory<Cluster> { + + /** + * Creates a new Cluster based on the specified configuration. + * @param stormConf the storm configuration. + * @return a new a new {@link com.datastax.driver.core.Cluster} instance. + */ + @Override + protected Cluster make(Map<String, Object> stormConf) { + CassandraConf cassandraConf = new CassandraConf(stormConf); + + Cluster.Builder cluster = Cluster.builder() + .withoutJMXReporting() + .withoutMetrics() + .addContactPoints(cassandraConf.getNodes()) + .withPort(cassandraConf.getPort()) + .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE) + .withReconnectionPolicy(new ExponentialReconnectionPolicy(100L, TimeUnit.MINUTES.toMillis(1))) + .withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy())); + + final String username = cassandraConf.getUsername(); + final String password = cassandraConf.getPassword(); + + if( StringUtils.isNotEmpty(username) && StringUtils.isNotEmpty(password)) { + cluster.withAuthProvider(new PlainTextAuthProvider(username, password)); + } + + QueryOptions options = new QueryOptions() + .setConsistencyLevel(cassandraConf.getConsistencyLevel()); + cluster.withQueryOptions(options); + + + return cluster.build(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClient.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClient.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClient.java new file mode 100644 index 0000000..3bd80ed --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClient.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.client; + +import com.datastax.driver.core.Session; + +public interface SimpleClient { + + /** + * Creates a new session on this cluster. + * * + * @return a new session on this cluster. + * @throws com.datastax.driver.core.exceptions.NoHostAvailableException if we cannot reach any cassandra contact points. + */ + Session connect(); + + /** + * Close the underlying {@link com.datastax.driver.core.Cluster} instance. + */ + void close(); + + /** + * Checks whether the underlying cluster instance is closed. + */ + boolean isClose(); +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClientProvider.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClientProvider.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClientProvider.java new file mode 100644 index 0000000..412cb70 --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClientProvider.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.client; + +import java.util.Map; + +/** + * Default interface to provide cassandra client. + */ +public interface SimpleClientProvider { + + /** + * Creates a new cassandra client based on the specified storm configuration. + * + * @param config The configuration passed to the storm topology. + * @return a new {@link SimpleClient} instance. + */ + SimpleClient getClient(Map<String, Object> config); +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java new file mode 100644 index 0000000..8ed9293 --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.client.impl; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Host; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.google.common.base.Preconditions; +import org.apache.storm.cassandra.client.SimpleClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.Serializable; +import java.util.Set; + +/** + * Simple class to wrap cassandra {@link com.datastax.driver.core.Cluster} instance. + */ +public class DefaultClient implements SimpleClient, Closeable, Serializable { + + private final static Logger LOG = LoggerFactory.getLogger(DefaultClient.class); + + private String keyspace; + + private Cluster cluster; + + private Session session; + + /** + * Create a new {@link DefaultClient} instance. + * + * @param cluster a cassandra cluster client. + */ + public DefaultClient(Cluster cluster, String keyspace) { + Preconditions.checkNotNull(cluster, "Cluster cannot be 'null"); + this.cluster = cluster; + this.keyspace = keyspace; + + } + + public Set<Host> getAllHosts() { + Metadata metadata = getMetadata(); + return metadata.getAllHosts(); + } + + public Metadata getMetadata() { + return cluster.getMetadata(); + } + + + private String getExecutorName() { + Thread thread = Thread.currentThread(); + return thread.getName(); + } + /** + * {@inheritDoc} + */ + @Override + public synchronized Session connect() throws NoHostAvailableException { + if( isDisconnected() ) { + LOG.info(String.format("Connected to cluster: %s", cluster.getClusterName())); + for ( Host host : getAllHosts()) + LOG.info(String.format("Datacenter: %s; Host: %s; Rack: %s", host.getDatacenter(), host.getAddress(), host.getRack())); + + LOG.info(String.format("Connect to cluster using keyspace %s", keyspace)); + session = cluster.connect(keyspace); + } else { + LOG.warn(String.format("%s - Already connected to cluster: %s", getExecutorName(), cluster.getClusterName())); + } + + if( session.isClosed() ) { + LOG.warn("Session has been closed - create new one!"); + this.session = cluster.newSession(); + } + return session; + } + + /** + * Checks whether the client is already connected to the cluster. + */ + protected boolean isDisconnected() { + return session == null; + } + + /** + * {@inheritDoc} + */ + @Override + public void close( ) { + if( cluster != null && !cluster.isClosed() ) { + LOG.info(String.format("Try to close connection to cluster: %s", cluster.getClusterName())); + session.close(); + cluster.close(); + } + } + /** + * {@inheritDoc} + */ + @Override + public boolean isClose() { + return this.cluster.isClosed(); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BaseBeanFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BaseBeanFactory.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BaseBeanFactory.java new file mode 100644 index 0000000..1ce6bac --- /dev/null +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BaseBeanFactory.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.context; + +import java.util.Map; + +/** + * Base BeanProvider implementation. + */ +public abstract class BaseBeanFactory<T> implements BeanFactory<T> { + + protected WorkerCtx context; + + protected volatile T instance; + /** + * {@inheritDoc} + */ + @Override + public void setStormContext(WorkerCtx context) { + this.context = context; + } + + /** + * {@inheritDoc} + */ + @Override + public synchronized T get(Map<String, Object> stormConf) { + if( instance != null) return instance; + return instance = make(stormConf); + } + /** + * Return a new instance of T. + */ + protected abstract T make(final Map<String, Object> stormConf); + /** + * {@inheritDoc} + */ + @Override + public BeanFactory<T> newInstance() { + Class<? extends BaseBeanFactory> clazz = this.getClass(); + try { + BaseBeanFactory factory = clazz.newInstance(); + factory.setStormContext(this.context); + return factory; + } catch (IllegalAccessException | InstantiationException e) { + throw new RuntimeException("Cannot create a new instance of " + clazz.getSimpleName(), e); + } + } +} \ No newline at end of file
