Just use the official connector from DataStax 
https://github.com/datastax/spark-cassandra-connector

Your solution is very similar. Let’s assume the state is

case class UserState(amount: Int, updates: Seq[Int])

And your user has 100 - If your user does not see an update, you can emit

Some(UserState(100, Seq.empty))

Otherwise maybe you can emit

Some(UserState(130, List(50, -20)))

You can then process the updates like this

usersState.filter(_.updates.length > 0).foreachRdd { ... }

Regarding optimizations, I would not worry too much about it. Going through 
users with no updates is most likely a no-op. Spark HAS to iterate through all 
the state objects since it does not operate with deltas from one batch to the 
next – the StateDStream is really the whole app state packed as a RDD.
You could look at one of the other updateStateByKey methods – maybe you can 
write more efficient code there:

def updateStateByKey[S: ClassTag](
    updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
    partitioner: Partitioner,
    rememberPartitioner: Boolean
  ): DStream[(K, S)] = …

What you can do though (and here you’ll be glad that spark also executes the 
code for state objects w/o updates) is cleanup users if they haven’t received 
updates for a long time, then load the state from DB the next time you see 
them. I would consider this a must-have optimization to keep some bounds on the 
memory needs.

-adrian

From: Thúy Hằng Lê
Date: Friday, September 25, 2015 at 2:05 PM
To: Adrian Tanase
Subject: Re: Using Spark for portfolio manager app


Hi Adrian,

Thanks Cassandra seems to be good candidate too. I will give it a try.
Do you know any stable connector that help Spark work with Cassandra? Or I 
should write it myself.

Regards my second question, i think i figuring the another solution, i will 
append another flag ( like isNew) to the tupe in updateStateByKey function. 
Then using filter to know which record i should update to database.
But it would be great if you could share your solution too( i don't quite get 
the idea of emitting new tupe).

In addition to this, for Spark design, seems it have to iterate to all key( 
includes one that not change) to do aggregation for each batch. For my use 
cases i have 3M keys, but only 2-3K change for each batch ( every 1 second) is 
there any way to optimize this process?

On Sep 25, 2015 4:12 PM, "Adrian Tanase" 
<atan...@adobe.com<mailto:atan...@adobe.com>> wrote:
Re: DB I strongly encourage you to look at Cassandra – it’s almost as powerful 
as Hbase, a lot easier to setup and manage. Well suited for this type of 
usecase, with a combination of K/V store and time series data.

For the second question, I’ve used this pattern all the time for “flash 
messages” - passing info as a 1 time message downstream:

  *   In your updateStateByKey function, emit a tuple of (actualNewState, 
changedData)
  *   Then filter this on !changedData.isEmpty or something
  *   And only do foreachRdd on the filtered stream.

Makes sense?

-adrian

From: Thúy Hằng Lê
Date: Friday, September 25, 2015 at 10:31 AM
To: ALEX K
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Re: Using Spark for portfolio manager app


Thanks all for the feedback so far.
I havn't decided which external storage will be used yet.
HBase is cool but it requires Hadoop in production. I only have 3-4 servers for 
the whole things ( i am thinking of a relational database for this, can be 
MariaDB, Memsql or mysql) but they are hard to scale.
I will try various appoaches before making any decision.

In addition, using Spark Streaming is there any way to update only new data to 
external storage after using updateStateByKey?
The foreachRDD function seems to loop over all RDDs( includes one that havent 
changed) i believe Spark streamming must has a way to do it, but i still 
couldn't find an example doing similar job.

Reply via email to