Dear fellow Sparkers, I am a new Spark user and I am trying to solve a (conceptually simple) problem which may not be a good use case for Spark, at least for the RDD API. But before I turn my back on it, I would rather have the opinion of more knowledgeable developers than me, as it is highly likely that I am missing something.
Here is my problem in a nutshell. I have numerous files where each line is an event of the form: (subscribe|unsubscribe),<list_id>,<user_id>,<timestamp> I need to gather time-framed (for example, weekly or monthly) statistics of the kind: <list_id>, <num_current_users>, <total_num_users_who_joined_from_dawn_of_time>, <total_num_events_from_dawn_of_time> Ideally, I would need a Spark job that output these statistics for all time periods at once. The number of unique <list_id> is a about a few hundreds, the number of unique <user_id> is a few dozens of millions. The trouble is that the data is not "clean", in the sense that I can have 'unsubscribe' events for users that are not subscribed, or 'subscribe' events for users that are already subscribed. This means I have to keep in memory the complete list of (subscribe|unsubscribe),<list_id>,<user_id>,<timestamp> keeping only the entry for the most recent <timestamp> for a given couple (list_id,user_id). If one is only interested in keeping the final statistics, this is relatively easy to do with reduceByKey and combineByKey on a properly keyed RDD containing all events. However I am struggling when it comes down to compute the "partial" statistics, as I obviously do not want to duplicate most of the computations done for period (i-1) when I am adding the events for period (i) as my reduceByKey/combineByKey approach will lead to. Sequentially, the problem is trivial: keep all events (with the latest 'valid' event for each couple (list_id,user_id)) in a huge hash table which can be used to decide whether to increment or decrement <num_current_users> (for example) and save the states of the current statistics whenever we are done dealing with period (i). I do not know how to efficiently solve this in Spark though. A naive idea would be to fetch the data for period(0) in an explicitly partitioned RDD (for example according to the last few characters of <user_id>) and proceed in a sequential fashion within a call to mapPartition. The trouble would then be how to process new data files for later periods. Suppose I store the event RDDs in an array 'data' (indexed by period number), all of them similarly partitioned, I am afraid something like this is not viable (please excuse pseudo-code): data[0].mapPartitionWithIndex( (index, iterator) => { // // 1. Initialize 'hashmap' keyed by (list_id,user_id) for the partition // val hashmap = new HashMap[(String, String), Event] // // 2. Iterate over events in data[0] rdd, update 'hashmap', // output stats for this partition and period. // while (iterator.hasNext) { // // Process entry, update 'hashmap', output stats // for the partition and period. // } // // 3. Loop over all the periods. // for (period <- 1 until max) { val next = data[period].mapPartitionWithIndex( (index2, iterator2) => { if (index2 == index) { while (iterator2.hasNext) { // // Iterate over the elements of next (since // the data should be on the same node, so no // shuffling after the initial partitioning, // right?), update 'hashmap', and output stats // for this partition and period. // } } else { iterator2 } } ) } } ) The trouble with this approach it that I am afraid the data files for period (i > 0) will be read as many times as there are partitions in data[0] unless I explicitly persist them maybe, which is inefficient. That said there is probably a (clumsy) way to unpersist them once I am sure I'm 100% done with them. All of this looks not only inelegant but shamefully un-spark like to me. Am I missing a trick here, maybe a well-known pattern? Are RDDs not the most appropriate API to handle this kind of tasks? If so, what do you suggest I could look into? Thank you for taking the time to read that overly long message ;-) Jeroen