Hi Ashic,

At the moment I see two options:

1) You could  use the CassandraConnector object to execute your specialized
query. The recommended pattern is to to that within a
rdd.foreachPartition(...) in order to amortize DB connection setup over the
number of elements in on partition.   Something like this:

val sparkContext = ???
val cassandraConnector = CassandraConnector(conf)
val dataRdd = ??? // I assume this is the source of data
val rddThingById = dataRdd.map(elem => transformToIdByThing(elem) )
 rddThingById.foreachPartition(partition => {
    cassandraConnector.withSessionDo{ session =>
      partition.foreach(record => session.execute("update foo set things =
things + ? where id=? ", record.id, record.thing)
   }
 }

2) You could change your datamodel slightly in order to avoid the update
operation.
Actually, the cassandra representation of a set is nothing more than a
column -> timestamp, where the column name is an element of the set.
So Set (a,b,c) = Column(a)-> ts, Column(b) -> ts, Column(c) -> tx

So, if you desugarize your datamodel, you could use something like:
create table foo (
   id text primary key,
   bar int,
   things text,
   ts timestamp,
   primary key ((id, bar), things)
)

And your Spark process would be reduced to:
val sparkContext = ???
val dataRdd = ??? // I assume this is the source of data
dataRdd.map(elem => transformToIdBarThingByTimeStamp(elem)
).saveToCassandra(ks, foo,Columns(id, bar, thing, ts))


Hope this helps.

-kr, Gerard.











On Thu, Oct 23, 2014 at 2:48 PM, Ashic Mahtab <as...@live.com> wrote:

> Hi Gerard,
> Thanks for the response. Here's the scenario:
>
> The target cassandra schema looks like this:
>
> create table foo (
>    id text primary key,
>    bar int,
>    things set<text>
> )
>
> The source in question is a Sql Server source providing the necessary
> data. The source goes over the same "id" multiple times adding things to
> the "things" set each time. With inserts, it'll replace "things" with a new
> set of one element, instead of appending that item. As such, the query
>
> update foo set things = things + ? where id=?
>
> solves the problem. If I had to stick with saveToCassasndra, I'd have to
> read in the existing row for each row, and then write it back. Since this
> is happening in parallel on multiple machines, that would likely cause
> discrepancies where a node will read and update to older values. Hence my
> question about session management in order to issue custom update queries.
>
> Thanks,
> Ashic.
>
> ------------------------------
> Date: Thu, 23 Oct 2014 14:27:47 +0200
> Subject: Re: Spark Cassandra Connector proper usage
> From: gerard.m...@gmail.com
> To: as...@live.com
>
>
> Ashic,
> With the Spark-cassandra connector you would typically create an RDD from
> the source table, update what you need,  filter out what you don't update
> and write it back to Cassandra.
>
> Kr, Gerard
> On Oct 23, 2014 1:21 PM, "Ashic Mahtab" <as...@live.com> wrote:
>
> I'm looking to use spark for some ETL, which will mostly consist of
> "update" statements (a column is a set, that'll be appended to, so a simple
> insert is likely not going to work). As such, it seems like issuing CQL
> queries to import the data is the best option. Using the Spark Cassandra
> Connector, I see I can do this:
>
>
>
> <https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra>
> <https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra>
> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra
>
>
>
> <https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra>
>
> Now I don't want to open a session and close it for every row in the
> source (am I right in not wanting this? Usually, I have one session for the
> entire process, and keep using that in "normal" apps). However, it says
> that the connector is serializable, but the session is obviously not. So,
> wrapping the whole import inside a single "withSessionDo" seems like it'll
> cause problems. I was thinking of using something like this:
>
>
> class CassandraStorage(conf:SparkConf) {
>   val session = CassandraConnector(conf).openSession()
>   def store (t:Thingy) : Unit = {
>     //session.execute cql goes here
>   }
> }
>
>
> Is this a good approach? Do I need to worry about closing the session?
> Where / how best would I do that? Any pointers are appreciated.
>
>
> Thanks,
>
> Ashic.
>
>

Reply via email to