This might be useful to you as well:

Netty Issue (I'm using Storm-Kafka and Kafka as well):
rm /opt/programs/storm/lib/netty-3.2.2.Final.jar


CassandraWriterBolt Topology implementation:
        CassandraWriterBolt cassandraWriterImpressionsBolt = new
CassandraWriterBolt(
                properties.getProperty("cassandra.host"),
                properties.getProperty("cassandra.host.keyspace"),
                properties.getProperty("cassandra.cql.writer.impressions")
        );


CassandraWriterBolt.java

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import com.datastax.driver.core.*;
import com.datastax.driver.core.policies.*;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * CassandraWriterBolt.java
 * CQL Prepared Statement Writer to Cassandra Cluster as Storm Bolt.
 *
 * @author Dan DeCapria, Copyright (c) 2011-2014
 * @since 19 August 2014
 * @version 0.1, 26 August 2014
 */
public class CassandraWriterBolt extends BaseRichBolt {

    private OutputCollector collector;
    private static final Logger logger =
LoggerFactory.getLogger(CassandraWriterBolt.class);
    private TopologyContext context;

    private String node;
    private String key_space;
    private String cql;

    private Cluster cluster;
    private Session session;
    private PreparedStatement prepared_statement;

    public CassandraWriterBolt(String node, String key_space, String cql) {
        this.node = node;
        this.key_space = key_space;
        this.cql = cql;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        //  we don't emit anything from here
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
        this.context = context;
        this.collector = collector;
        this.cluster = Cluster.builder()
                .withoutJMXReporting()
                .withoutMetrics()
                .addContactPoint(this.node)
                .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
                .withReconnectionPolicy(new
ExponentialReconnectionPolicy(100L, 5000L))
                .withLoadBalancingPolicy(new TokenAwarePolicy(new
RoundRobinPolicy()))
                .build();

        this.session = this.cluster.connect(this.key_space);
        this.prepared_statement = this.session.prepare(this.cql);
    }

    @Override
    public void execute(Tuple input) {
        try {

this.session.execute(this.prepared_statement.bind(input.getValues().toArray()));
        } catch (Exception ex) {
            logger.error("Exception: " + ex.getMessage() + ";\t tuple = " +
input);
            this.collector.reportError(ex);
            this.collector.fail(input);
        }
        this.collector.ack(input);
    }

    @Override
    public void cleanup() {
        this.session.close();
        this.cluster.close();
    }

}


POM snippet:

        <dependency>
            <groupId>com.datastax.cassandra</groupId>
            <artifactId>cassandra-driver-core</artifactId>
            <version>2.0.3</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.xerial.snappy</groupId>
            <artifactId>snappy-java</artifactId>
            <version>1.0.5</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>net.jpountz.lz4</groupId>
            <artifactId>lz4</artifactId>
            <version>1.2.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>com.hmsonline</groupId>
            <artifactId>storm-cassandra-cql</artifactId>
            <version>0.1.8</version>
            <scope>compile</scope>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>


On Thu, Sep 25, 2014 at 2:27 PM, Strulovitch, Zack <zstrulovi...@tycoint.com
> wrote:

>  Thank you Robert and Harsha. I like Robert's suggestion since it was
> updated recently.
>
>
>  *From:* Robert Lee [lee.robert...@gmail.com]
> *Sent:* Thursday, September 25, 2014 2:24 PM
> *To:* user@storm.incubator.apache.org
> *Subject:* Re: Cassandra bolt
>
>   If you are looking for CQL, I'd suggest:
>
> https://github.com/hmsonline/storm-cassandra-cql
>
> On Thu, Sep 25, 2014 at 2:22 PM, Harsha <st...@harsha.io> wrote:
>
>>  did you tried https://github.com/ptgoetz/storm-cassandra.
>>
>>
>> On Thu, Sep 25, 2014, at 11:20 AM, Strulovitch, Zack wrote:
>>
>>   I've updated to 0.9.2 from pre-apache version 0.9.0.1 (which broke my
>> Cassandra bolt implemented using this code :
>> https://github.com/tjake/stormscraper
>> https://github.com/tjake/stormscraper )
>>  According to some posts, this is due to netty conflict. Could anyone
>> please suggest me an alternative reliable Cassandra bolt implementation?
>>  Thank you in advance,
>>  Zack
>>
>>
>>  ------------------------------
>>
>>  This e-mail contains privileged and confidential information intended
>> for the use of the addressees named above. If you are not the intended
>> recipient of this e-mail, you are hereby notified that you must not
>> disseminate, copy or take any action in respect of any information
>> contained in it. If you have received this e-mail in error, please notify
>> the sender immediately by e-mail and immediately destroy this e-mail and
>> its attachments.
>>
>>
>>
>
>
> ------------------------------
>
> This e-mail contains privileged and confidential information intended for
> the use of the addressees named above. If you are not the intended
> recipient of this e-mail, you are hereby notified that you must not
> disseminate, copy or take any action in respect of any information
> contained in it. If you have received this e-mail in error, please notify
> the sender immediately by e-mail and immediately destroy this e-mail and
> its attachments.
>



-- 
Dan DeCapria
CivicScience, Inc.
Back-End Data IS/BI/DM/ML Specialist

Reply via email to