Hi everyone! We are just starting a new project with Spark and have a few (newbie) questions. Any help would be greatly appreciated!
* First a bit of context: We are working on CSV files, about 400M lines, in which each line represents a record (transaction) from customer_A to customer_B, with around 20 fields (transaction duration, value, type, ...). We would like to use spark to extract several hundreds of simple variables for each customer. We have created a Record class to represent the records, and ease their manipulation (filtering on fields, manipulating dates, ...) val f = sc.textFile("hdfs://data.csv") val data: RDD[Record] = f.map(Record(_)) where Record.apply(String) is splitting the csv strings and filling the fields accordingly Now given a set of (field, value), we want to compute a variable for each customer_A, on records that respect the set of fields and values: // Filter the initial RDD on the given fields and values val filteredData = data.filter(_.check(field1, value1)).filter(_.check(field2, value2)).filter(...) // Since we will want to filter later on other combinations of fields and values, // we would like to persist these rdd in memory. For instance: val filteredData2 = data.filter(_.check(field1, value1)).filter(_.check(field3, value3)) // should benefit from the RDD already filtered on field1, value1 // where should we put the cache? in between each filter / at the beginning / at the end ? // Compute the variable of interest, grouped by customer // The formulation is generic, using functions f, g and h // The output is always a rdd of pairs (customerA, variable) val variable = filteredData.map( x => ( x.customerA, f( x ) ) ).reduceByKey( g( _, _ ) ).map( h( _ ) ) // For example, if we want to compute the number of records for each customer_A with field1=value1, we define: def f(x) = 1 def g(x,y) = x+y def h(x,y) = (x,y) // Another example, if we want to compute the number of distinct customer_B are involved in records with customer_A, we define: def f(x) = x.customerB def g(x,y) = x ++ y def h(x,y) = ( x, y.size ) * Now, my questions ;) - SERIALIZATION: - Is it OK to manipulate RDDs of Record objects or should we stick with simple RDDs of strings, and do all the splitting and computation in each transformation ? How to make that efficient in terms of memory and speed? I've read the docs about the Tuning (and Kryo serialization) but I'd like to have more info on that... - PERFORMANCE: - Is it a good idea to perform all the filters first, and then the groupBy customer, or should we do the reverse? In the second situation, how can we filter on the values? I didn't see a filterValues method in the PairRDD API ? Thanks for any help you can bring us! Pierre