This might be useful to you as well: Netty Issue (I'm using Storm-Kafka and Kafka as well): rm /opt/programs/storm/lib/netty-3.2.2.Final.jar
CassandraWriterBolt Topology implementation: CassandraWriterBolt cassandraWriterImpressionsBolt = new CassandraWriterBolt( properties.getProperty("cassandra.host"), properties.getProperty("cassandra.host.keyspace"), properties.getProperty("cassandra.cql.writer.impressions") ); CassandraWriterBolt.java import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; import com.datastax.driver.core.*; import com.datastax.driver.core.policies.*; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * CassandraWriterBolt.java * CQL Prepared Statement Writer to Cassandra Cluster as Storm Bolt. * * @author Dan DeCapria, Copyright (c) 2011-2014 * @since 19 August 2014 * @version 0.1, 26 August 2014 */ public class CassandraWriterBolt extends BaseRichBolt { private OutputCollector collector; private static final Logger logger = LoggerFactory.getLogger(CassandraWriterBolt.class); private TopologyContext context; private String node; private String key_space; private String cql; private Cluster cluster; private Session session; private PreparedStatement prepared_statement; public CassandraWriterBolt(String node, String key_space, String cql) { this.node = node; this.key_space = key_space; this.cql = cql; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // we don't emit anything from here } @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.context = context; this.collector = collector; this.cluster = Cluster.builder() .withoutJMXReporting() .withoutMetrics() .addContactPoint(this.node) .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE) .withReconnectionPolicy(new ExponentialReconnectionPolicy(100L, 5000L)) .withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy())) .build(); this.session = this.cluster.connect(this.key_space); this.prepared_statement = this.session.prepare(this.cql); } @Override public void execute(Tuple input) { try { this.session.execute(this.prepared_statement.bind(input.getValues().toArray())); } catch (Exception ex) { logger.error("Exception: " + ex.getMessage() + ";\t tuple = " + input); this.collector.reportError(ex); this.collector.fail(input); } this.collector.ack(input); } @Override public void cleanup() { this.session.close(); this.cluster.close(); } } POM snippet: <dependency> <groupId>com.datastax.cassandra</groupId> <artifactId>cassandra-driver-core</artifactId> <version>2.0.3</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.xerial.snappy</groupId> <artifactId>snappy-java</artifactId> <version>1.0.5</version> <scope>compile</scope> </dependency> <dependency> <groupId>net.jpountz.lz4</groupId> <artifactId>lz4</artifactId> <version>1.2.0</version> <scope>compile</scope> </dependency> <dependency> <groupId>com.hmsonline</groupId> <artifactId>storm-cassandra-cql</artifactId> <version>0.1.8</version> <scope>compile</scope> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> On Thu, Sep 25, 2014 at 2:27 PM, Strulovitch, Zack <zstrulovi...@tycoint.com > wrote: > Thank you Robert and Harsha. I like Robert's suggestion since it was > updated recently. > > > *From:* Robert Lee [lee.robert...@gmail.com] > *Sent:* Thursday, September 25, 2014 2:24 PM > *To:* user@storm.incubator.apache.org > *Subject:* Re: Cassandra bolt > > If you are looking for CQL, I'd suggest: > > https://github.com/hmsonline/storm-cassandra-cql > > On Thu, Sep 25, 2014 at 2:22 PM, Harsha <st...@harsha.io> wrote: > >> did you tried https://github.com/ptgoetz/storm-cassandra. >> >> >> On Thu, Sep 25, 2014, at 11:20 AM, Strulovitch, Zack wrote: >> >> I've updated to 0.9.2 from pre-apache version 0.9.0.1 (which broke my >> Cassandra bolt implemented using this code : >> https://github.com/tjake/stormscraper >> https://github.com/tjake/stormscraper ) >> According to some posts, this is due to netty conflict. Could anyone >> please suggest me an alternative reliable Cassandra bolt implementation? >> Thank you in advance, >> Zack >> >> >> ------------------------------ >> >> This e-mail contains privileged and confidential information intended >> for the use of the addressees named above. If you are not the intended >> recipient of this e-mail, you are hereby notified that you must not >> disseminate, copy or take any action in respect of any information >> contained in it. If you have received this e-mail in error, please notify >> the sender immediately by e-mail and immediately destroy this e-mail and >> its attachments. >> >> >> > > > ------------------------------ > > This e-mail contains privileged and confidential information intended for > the use of the addressees named above. If you are not the intended > recipient of this e-mail, you are hereby notified that you must not > disseminate, copy or take any action in respect of any information > contained in it. If you have received this e-mail in error, please notify > the sender immediately by e-mail and immediately destroy this e-mail and > its attachments. > -- Dan DeCapria CivicScience, Inc. Back-End Data IS/BI/DM/ML Specialist