Hi Ashic, At the moment I see two options:
1) You could use the CassandraConnector object to execute your specialized query. The recommended pattern is to to that within a rdd.foreachPartition(...) in order to amortize DB connection setup over the number of elements in on partition. Something like this: val sparkContext = ??? val cassandraConnector = CassandraConnector(conf) val dataRdd = ??? // I assume this is the source of data val rddThingById = dataRdd.map(elem => transformToIdByThing(elem) ) rddThingById.foreachPartition(partition => { cassandraConnector.withSessionDo{ session => partition.foreach(record => session.execute("update foo set things = things + ? where id=? ", record.id, record.thing) } } 2) You could change your datamodel slightly in order to avoid the update operation. Actually, the cassandra representation of a set is nothing more than a column -> timestamp, where the column name is an element of the set. So Set (a,b,c) = Column(a)-> ts, Column(b) -> ts, Column(c) -> tx So, if you desugarize your datamodel, you could use something like: create table foo ( id text primary key, bar int, things text, ts timestamp, primary key ((id, bar), things) ) And your Spark process would be reduced to: val sparkContext = ??? val dataRdd = ??? // I assume this is the source of data dataRdd.map(elem => transformToIdBarThingByTimeStamp(elem) ).saveToCassandra(ks, foo,Columns(id, bar, thing, ts)) Hope this helps. -kr, Gerard. On Thu, Oct 23, 2014 at 2:48 PM, Ashic Mahtab <as...@live.com> wrote: > Hi Gerard, > Thanks for the response. Here's the scenario: > > The target cassandra schema looks like this: > > create table foo ( > id text primary key, > bar int, > things set<text> > ) > > The source in question is a Sql Server source providing the necessary > data. The source goes over the same "id" multiple times adding things to > the "things" set each time. With inserts, it'll replace "things" with a new > set of one element, instead of appending that item. As such, the query > > update foo set things = things + ? where id=? > > solves the problem. If I had to stick with saveToCassasndra, I'd have to > read in the existing row for each row, and then write it back. Since this > is happening in parallel on multiple machines, that would likely cause > discrepancies where a node will read and update to older values. Hence my > question about session management in order to issue custom update queries. > > Thanks, > Ashic. > > ------------------------------ > Date: Thu, 23 Oct 2014 14:27:47 +0200 > Subject: Re: Spark Cassandra Connector proper usage > From: gerard.m...@gmail.com > To: as...@live.com > > > Ashic, > With the Spark-cassandra connector you would typically create an RDD from > the source table, update what you need, filter out what you don't update > and write it back to Cassandra. > > Kr, Gerard > On Oct 23, 2014 1:21 PM, "Ashic Mahtab" <as...@live.com> wrote: > > I'm looking to use spark for some ETL, which will mostly consist of > "update" statements (a column is a set, that'll be appended to, so a simple > insert is likely not going to work). As such, it seems like issuing CQL > queries to import the data is the best option. Using the Spark Cassandra > Connector, I see I can do this: > > > > <https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra> > <https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra> > https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra > > > > <https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra> > > Now I don't want to open a session and close it for every row in the > source (am I right in not wanting this? Usually, I have one session for the > entire process, and keep using that in "normal" apps). However, it says > that the connector is serializable, but the session is obviously not. So, > wrapping the whole import inside a single "withSessionDo" seems like it'll > cause problems. I was thinking of using something like this: > > > class CassandraStorage(conf:SparkConf) { > val session = CassandraConnector(conf).openSession() > def store (t:Thingy) : Unit = { > //session.execute cql goes here > } > } > > > Is this a good approach? Do I need to worry about closing the session? > Where / how best would I do that? Any pointers are appreciated. > > > Thanks, > > Ashic. > >