Thank you very much Dan

From: Dan DeCapria, CivicScience [[email protected]]
Sent: Thursday, September 25, 2014 3:08 PM
To: [email protected]
Subject: Re: Cassandra bolt

This might be useful to you as well:

Netty Issue (I'm using Storm-Kafka and Kafka as well):
rm /opt/programs/storm/lib/netty-3.2.2.Final.jar


CassandraWriterBolt Topology implementation:
        CassandraWriterBolt cassandraWriterImpressionsBolt = new 
CassandraWriterBolt(
                properties.getProperty("cassandra.host"),
                properties.getProperty("cassandra.host.keyspace"),
                properties.getProperty("cassandra.cql.writer.impressions")
        );


CassandraWriterBolt.java

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import com.datastax.driver.core.*;
import com.datastax.driver.core.policies.*;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * CassandraWriterBolt.java
 * CQL Prepared Statement Writer to Cassandra Cluster as Storm Bolt.
 *
 * @author Dan DeCapria, Copyright (c) 2011-2014
 * @since 19 August 2014
 * @version 0.1, 26 August 2014
 */
public class CassandraWriterBolt extends BaseRichBolt {

    private OutputCollector collector;
    private static final Logger logger = 
LoggerFactory.getLogger(CassandraWriterBolt.class);
    private TopologyContext context;

    private String node;
    private String key_space;
    private String cql;

    private Cluster cluster;
    private Session session;
    private PreparedStatement prepared_statement;

    public CassandraWriterBolt(String node, String key_space, String cql) {
        this.node = node;
        this.key_space = key_space;
        this.cql = cql;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        //  we don't emit anything from here
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector 
collector) {
        this.context = context;
        this.collector = collector;
        this.cluster = Cluster.builder()
                .withoutJMXReporting()
                .withoutMetrics()
                .addContactPoint(this.node)
                .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
                .withReconnectionPolicy(new ExponentialReconnectionPolicy(100L, 
5000L))
                .withLoadBalancingPolicy(new TokenAwarePolicy(new 
RoundRobinPolicy()))
                .build();

        this.session = this.cluster.connect(this.key_space);
        this.prepared_statement = this.session.prepare(this.cql);
    }

    @Override
    public void execute(Tuple input) {
        try {
            
this.session.execute(this.prepared_statement.bind(input.getValues().toArray()));
        } catch (Exception ex) {
            logger.error("Exception: " + ex.getMessage() + ";\t tuple = " + 
input);
            this.collector.reportError(ex);
            this.collector.fail(input);
        }
        this.collector.ack(input);
    }

    @Override
    public void cleanup() {
        this.session.close();
        this.cluster.close();
    }

}


POM snippet:

        <dependency>
            <groupId>com.datastax.cassandra</groupId>
            <artifactId>cassandra-driver-core</artifactId>
            <version>2.0.3</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.xerial.snappy</groupId>
            <artifactId>snappy-java</artifactId>
            <version>1.0.5</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>net.jpountz.lz4</groupId>
            <artifactId>lz4</artifactId>
            <version>1.2.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>com.hmsonline</groupId>
            <artifactId>storm-cassandra-cql</artifactId>
            <version>0.1.8</version>
            <scope>compile</scope>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>


On Thu, Sep 25, 2014 at 2:27 PM, Strulovitch, Zack 
<[email protected]<mailto:[email protected]>> wrote:
Thank you Robert and Harsha. I like Robert's suggestion since it was updated 
recently.


From: Robert Lee [[email protected]<mailto:[email protected]>]
Sent: Thursday, September 25, 2014 2:24 PM
To: [email protected]<mailto:[email protected]>
Subject: Re: Cassandra bolt

If you are looking for CQL, I'd suggest:

https://github.com/hmsonline/storm-cassandra-cql

On Thu, Sep 25, 2014 at 2:22 PM, Harsha 
<[email protected]<mailto:[email protected]>> wrote:
did you tried https://github.com/ptgoetz/storm-cassandra.


On Thu, Sep 25, 2014, at 11:20 AM, Strulovitch, Zack wrote:
I've updated to 0.9.2 from pre-apache version 0.9.0.1 (which broke my Cassandra 
bolt implemented using this code : 
https://github.com/tjake/stormscraperhttps://github.com/tjake/stormscraper )
According to some posts, this is due to netty conflict. Could anyone please 
suggest me an alternative reliable Cassandra bolt implementation?
Thank you in advance,
Zack


________________________________

This e-mail contains privileged and confidential information intended for the 
use of the addressees named above. If you are not the intended recipient of 
this e-mail, you are hereby notified that you must not disseminate, copy or 
take any action in respect of any information contained in it. If you have 
received this e-mail in error, please notify the sender immediately by e-mail 
and immediately destroy this e-mail and its attachments.



________________________________

This e-mail contains privileged and confidential information intended for the 
use of the addressees named above. If you are not the intended recipient of 
this e-mail, you are hereby notified that you must not disseminate, copy or 
take any action in respect of any information contained in it. If you have 
received this e-mail in error, please notify the sender immediately by e-mail 
and immediately destroy this e-mail and its attachments.



--
Dan DeCapria
CivicScience, Inc.
Back-End Data IS/BI/DM/ML Specialist

________________________________

This e-mail contains privileged and confidential information intended for the 
use of the addressees named above. If you are not the intended recipient of 
this e-mail, you are hereby notified that you must not disseminate, copy or 
take any action in respect of any information contained in it. If you have 
received this e-mail in error, please notify the sender immediately by e-mail 
and immediately destroy this e-mail and its attachments.

Reply via email to