Hi,

(My excuses for the cross-post from SO)

I'm trying to create Cassandra SSTables from the results of a batch
computation in Spark. Ideally, each partition should create the SSTable for
the data it holds in order to parallelize the process as much as possible
(and probably even stream it to the Cassandra ring as well)

After the initial hurdles with the CQLSSTableWriter (like requiring the
yaml file), I'm confronted now with this issue:

java.lang.RuntimeException: Attempting to load already loaded column
family customer.rawts
    at org.apache.cassandra.config.Schema.load(Schema.java:347)
    at org.apache.cassandra.config.Schema.load(Schema.java:112)
    at 
org.apache.cassandra.io.sstable.CQLSSTableWriter$Builder.forTable(CQLSSTableWriter.java:336)

I'm creating a writer on each parallel partition like this:

def store(rdd:RDD[Message]) = {
    rdd.foreachPartition( msgIterator => {
      val writer = CQLSSTableWriter.builder()
        .inDirectory("/tmp/cass")
        .forTable(schema)
        .using(insertSttmt).build()
      msgIterator.foreach(msg => {...})
    })}

And if I'm reading the exception correctly, I can only create one writer
per table in one JVM. Digging a bit further in the code, it looks like the
Schema.load(...) singleton enforces that limitation.

I guess writings to the writer will not be thread-safe and even if they
were the contention that multiple threads will create by having all
parallel tasks trying to dump few GB of data to disk at the same time will
defeat the purpose of using the SSTables for bulk upload anyway.

So, are there ways to use the CQLSSTableWriter concurrently?

If not, what is the next best option to load batch data at high throughput
in Cassandra?

Will the upcoming Spark-Cassandra integration help with this? (ie. should I
just sit back, relax and the problem will solve itself?)

Thanks,

Gerard.

Reply via email to