Dear fellow Sparkers,

I am a new Spark user and I am trying to solve a (conceptually simple)
problem which may not be a good use case for Spark, at least for the RDD
API. But before I turn my back on it, I would rather have the opinion of
more knowledgeable developers than me, as it is highly likely that I am
missing something.

Here is my problem in a nutshell.

I have numerous files where each line is an event of the form:

    (subscribe|unsubscribe),<list_id>,<user_id>,<timestamp>

I need to gather time-framed (for example, weekly or monthly) statistics of
the kind:

  <list_id>,
  <num_current_users>,
  <total_num_users_who_joined_from_dawn_of_time>,
  <total_num_events_from_dawn_of_time>

Ideally, I would need a Spark job that output these statistics for all time
periods at once. The number of unique <list_id> is a about a few hundreds,
the number of unique <user_id> is a few dozens of millions.

The trouble is that the data is not "clean", in the sense that I can have
'unsubscribe' events for users that are not subscribed, or 'subscribe'
events for users that are already subscribed.

This means I have to keep in memory the complete list of

    (subscribe|unsubscribe),<list_id>,<user_id>,<timestamp>

keeping only the entry for the most recent <timestamp> for a given couple
(list_id,user_id).

If one is only interested in keeping the final statistics, this is
relatively easy to do with reduceByKey and combineByKey on a properly keyed
RDD containing all events.

However I am struggling when it comes down to compute the "partial"
statistics, as I obviously do not want to duplicate most of the
computations done for period (i-1) when I am adding the events for period
(i) as my reduceByKey/combineByKey approach will lead to.

Sequentially, the problem is trivial: keep all events (with the latest
'valid' event for each couple (list_id,user_id)) in a huge hash table which
can be used to decide whether to increment or decrement <num_current_users>
(for example) and save the states of the current statistics whenever we are
done dealing with period (i).

I do not know how to efficiently solve this in Spark though.

A naive idea would be to fetch the data for period(0) in an explicitly
partitioned RDD (for example according to the last few characters of
<user_id>) and proceed in a sequential fashion within a call to
mapPartition.

The trouble would then be how to process new data files for later periods.
Suppose I store the event RDDs in an array 'data' (indexed by period
number), all of them similarly partitioned, I am afraid something like this
is not viable (please excuse pseudo-code):

    data[0].mapPartitionWithIndex(

      (index, iterator) => {
            //
            // 1. Initialize 'hashmap' keyed by (list_id,user_id) for the
partition
            //
            val hashmap = new HashMap[(String, String), Event]

            //
            // 2. Iterate over events in data[0] rdd, update 'hashmap',
            //    output stats for this partition and period.
            //
            while (iterator.hasNext) {
                //
                // Process entry, update 'hashmap', output stats
                // for the partition and period.
                //
            }

            //
            // 3. Loop over all the periods.
            //
            for (period <- 1 until max) {
                val next = data[period].mapPartitionWithIndex(
                    (index2, iterator2) => {
                        if (index2 == index) {
                            while (iterator2.hasNext) {
                                //
                                // Iterate over the elements of next (since
                                // the data should be on the same node, so
no
                                // shuffling after the initial partitioning,
                                // right?), update 'hashmap', and output
stats
                                // for this partition and period.
                                //
                            }
                        } else {
                            iterator2
                        }
                    }
                )
            }
        }
    )

The trouble with this approach it that I am afraid the data files for
period (i > 0) will be read as many times as there are partitions in
data[0] unless I explicitly persist them maybe, which is inefficient. That
said there is probably a (clumsy) way to unpersist them once I am sure I'm
100% done with them.

All of this looks not only inelegant but shamefully un-spark like to me.

Am I missing a trick here, maybe a well-known pattern? Are RDDs not the
most appropriate API to handle this kind of tasks? If so, what do you
suggest I could look into?

Thank you for taking the time to read that overly long message ;-)

Jeroen

Reply via email to