Hi everyone!

We are just starting a new project with Spark and have a few (newbie) questions.
Any help would be greatly appreciated!

* First a bit of context:

We are working on CSV files, about 400M lines, in which each line represents a 
record (transaction) from customer_A to customer_B, with around 20 fields 
(transaction duration, value, type, ...).
We would like to use spark to extract several hundreds of simple variables for 
each customer.

We have created a Record class to represent the records, and ease their 
manipulation (filtering on fields, manipulating dates, ...)
        val f = sc.textFile("hdfs://data.csv")
        val data: RDD[Record] = f.map(Record(_))      
where Record.apply(String) is splitting the csv strings and filling the fields 
accordingly

Now given a set of (field, value), we want to compute a variable for each 
customer_A, on records that respect the set of fields and values:

        // Filter the initial RDD on the given fields and values
        val filteredData = data.filter(_.check(field1, 
value1)).filter(_.check(field2, value2)).filter(...)
        // Since we will want to filter later on other combinations of fields 
and values,
        // we would like to persist these rdd in memory. For instance:
        val filteredData2 = data.filter(_.check(field1, 
value1)).filter(_.check(field3, value3))
        // should benefit from the RDD already filtered on field1, value1
        // where should we put the cache? in between each filter / at the 
beginning / at the end ?


        // Compute the variable of interest, grouped by customer
        // The formulation is generic, using functions f, g and h
        // The output is always a rdd of pairs (customerA, variable)
        val variable = filteredData.map( x => ( x.customerA, f( x ) ) 
).reduceByKey( g( _, _ ) ).map( h( _ ) )
        
        // For example, if we want to compute the number of records for each 
customer_A with field1=value1, we define:
        def f(x) = 1
        def g(x,y) = x+y
        def h(x,y) = (x,y)

        // Another example, if we want to compute the number of distinct 
customer_B are involved in records with customer_A, we define:
        def f(x) = x.customerB
        def g(x,y) =  x ++ y
        def h(x,y) = ( x, y.size )


* Now, my questions ;)

- SERIALIZATION: 
        - Is it OK to manipulate RDDs of Record objects or should we stick with 
simple RDDs of strings, and do all the splitting and computation in each 
transformation ?
        How to make that efficient in terms of memory and speed?
        I've read the docs about the Tuning (and Kryo serialization) but I'd 
like to have more info on that...

- PERFORMANCE:
        - Is it a good idea to perform all the filters first, and then the 
groupBy customer, or should we do the reverse?
        In the second situation, how can we filter on the values? I didn't see 
a filterValues method in the PairRDD API ?


Thanks for any help you can bring us!


Pierre

Reply via email to