See inline

On Fri, Feb 14, 2014 at 5:57 AM, Pierre Borckmans <
pierre.borckm...@realimpactanalytics.com> wrote:

>
> Hi everyone!
>
> We are just starting a new project with Spark and have a few (newbie)
> questions.
> Any help would be greatly appreciated!
>
> * First a bit of context:
>
> We are working on CSV files, about 400M lines, in which each line
> represents a record (transaction) from customer_A to customer_B, with
> around 20 fields (transaction duration, value, type, ...).
> We would like to use spark to extract several hundreds of simple variables
> for each customer.
>
> We have created a Record class to represent the records, and ease their
> manipulation (filtering on fields, manipulating dates, ...)
>         val f = sc.textFile("hdfs://data.csv")
>         val data: RDD[Record] = f.map(Record(_))
> where Record.apply(String) is splitting the csv strings and filling the
> fields accordingly
>
> Now given a set of (field, value), we want to compute a variable for each
> customer_A, on records that respect the set of fields and values:
>
>         // Filter the initial RDD on the given fields and values
>         val filteredData = data.filter(_.check(field1,
> value1)).filter(_.check(field2, value2)).filter(...)
>         // Since we will want to filter later on other combinations of
> fields and values,
>         // we would like to persist these rdd in memory. For instance:
>         val filteredData2 = data.filter(_.check(field1,
> value1)).filter(_.check(field3, value3))
>         // should benefit from the RDD already filtered on field1, value1
>         // where should we put the cache? in between each filter / at the
> beginning / at the end ?
>

You should put your .cache() *after* the creation of the RDD you plan to
re-use. Cache the data you're going to use over and over again.

>
>
>         // Compute the variable of interest, grouped by customer
>         // The formulation is generic, using functions f, g and h
>         // The output is always a rdd of pairs (customerA, variable)
>         val variable = filteredData.map( x => ( x.customerA, f( x ) )
> ).reduceByKey( g( _, _ ) ).map( h( _ ) )


>         // For example, if we want to compute the number of records for
> each customer_A with field1=value1, we define:
>         def f(x) = 1
>         def g(x,y) = x+y
>         def h(x,y) = (x,y)
>
>         // Another example, if we want to compute the number of distinct
> customer_B are involved in records with customer_A, we define:
>         def f(x) = x.customerB
>         def g(x,y) =  x ++ y
>         def h(x,y) = ( x, y.size )
>
>
> * Now, my questions ;)
>
> - SERIALIZATION:
>         - Is it OK to manipulate RDDs of Record objects or should we stick
> with simple RDDs of strings, and do all the splitting and computation in
> each transformation ?
>         How to make that efficient in terms of memory and speed?
>         I've read the docs about the Tuning (and Kryo serialization) but
> I'd like to have more info on that...
>
RDD of Record is OK as long as Record is serializable. Kryo may help you
here but I'm not sure how you're representing Record so I'm not sure how
much. Either way, it's usually best to get it working first and then worry
about serialization/performance later.

>
> - PERFORMANCE:
>         - Is it a good idea to perform all the filters first, and then the
> groupBy customer, or should we do the reverse?
>
Yes, performing filters first is a good idea. Spark doesn't do heavy (SQL
style) query optimization where filters are pushed down early in the DAG,
etc. If your queries look a lot like database queries, you might think
about using Shark for this - it'll save you some pain at
setup/serialization and may be much faster if your queries only operate on
a few columns at a time.

>         In the second situation, how can we filter on the values? I didn't
> see a filterValues method in the PairRDD API ?
>
You can do a .filter() on the value of a pairRDD like this

prdd.filter(_._2 == "something")
//or
prdd.filter(r => f(r._2))

>
>
> Thanks for any help you can bring us!
>
>
> Pierre
>
>

Reply via email to